164 lines
5.9 KiB
TypeScript
164 lines
5.9 KiB
TypeScript
import zlib from 'zlib'
|
|
import { corsMiddleware } from '@/modules/core/configs/cors'
|
|
import type { Application } from 'express'
|
|
import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream'
|
|
import { Duplex, PassThrough, pipeline } from 'stream'
|
|
import { getObjectsStreamFactory } from '@/modules/core/repositories/objects'
|
|
import { db } from '@/db/knex'
|
|
import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth'
|
|
import { getStreamFactory } from '@/modules/core/repositories/streams'
|
|
import { authorizeResolver, validateScopes } from '@/modules/shared'
|
|
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
|
|
import { UserInputError } from '@/modules/core/errors/userinput'
|
|
import { ensureError } from '@speckle/shared'
|
|
import chain from 'stream-chain'
|
|
import { get } from 'lodash'
|
|
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
|
|
import { DatabaseError } from '@/modules/shared/errors'
|
|
|
|
const { FF_OBJECTS_STREAMING_FIX } = getFeatureFlags()
|
|
|
|
export default (app: Application) => {
|
|
const validatePermissionsReadStream = validatePermissionsReadStreamFactory({
|
|
getStream: getStreamFactory({ db }),
|
|
validateScopes,
|
|
authorizeResolver
|
|
})
|
|
|
|
app.options('/api/getobjects/:streamId', corsMiddleware())
|
|
|
|
app.post('/api/getobjects/:streamId', corsMiddleware(), async (req, res) => {
|
|
req.log = req.log.child({
|
|
userId: req.context.userId || '-',
|
|
streamId: req.params.streamId
|
|
})
|
|
const hasStreamAccess = await validatePermissionsReadStream(
|
|
req.params.streamId,
|
|
req
|
|
)
|
|
if (!hasStreamAccess.result) {
|
|
return res.status(hasStreamAccess.status).end()
|
|
}
|
|
|
|
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
|
|
const getObjectsStream = getObjectsStreamFactory({ db: projectDb })
|
|
let childrenList: string[]
|
|
try {
|
|
childrenList = JSON.parse(req.body.objects)
|
|
} catch (err) {
|
|
throw new UserInputError(
|
|
'Invalid body. Please provide a JSON object containing the property "objects" of type string. The value must be a JSON string representation of an array of object IDs.',
|
|
ensureError(err, 'Unknown JSON parsing issue')
|
|
)
|
|
}
|
|
const simpleText = req.headers.accept === 'text/plain'
|
|
|
|
res.writeHead(200, {
|
|
'Content-Encoding': 'gzip',
|
|
'Content-Type': simpleText ? 'text/plain; charset=UTF-8' : 'application/json'
|
|
})
|
|
|
|
// "output" stream, connected to res with `pipeline` (auto-closing res)
|
|
const speckleObjStream = new SpeckleObjectsStream(simpleText)
|
|
const gzipStream = zlib.createGzip()
|
|
|
|
let chainPipeline: Duplex
|
|
|
|
if (FF_OBJECTS_STREAMING_FIX) {
|
|
// From node documentation: https://nodejs.org/docs/latest-v18.x/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback
|
|
// > stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
|
|
// As workaround, we are using chain from 'stream-chain'
|
|
// Some more conversation around this: https://stackoverflow.com/questions/61072482/node-closing-streams-properly-after-pipeline
|
|
chainPipeline = chain([
|
|
speckleObjStream,
|
|
gzipStream,
|
|
new PassThrough({ highWaterMark: 16384 * 31 }),
|
|
res
|
|
])
|
|
chainPipeline.on('error', (err) => {
|
|
if (err) {
|
|
switch (get(err, 'code')) {
|
|
case 'ERR_STREAM_PREMATURE_CLOSE':
|
|
req.log.info({ err }, 'Stream to client has prematurely closed')
|
|
break
|
|
default:
|
|
req.log.error(err, 'App error streaming objects')
|
|
break
|
|
}
|
|
return
|
|
}
|
|
|
|
req.log.info(
|
|
{
|
|
childCount: childrenList.length,
|
|
mbWritten: gzipStream.bytesWritten / 1000000
|
|
},
|
|
'Encountered error. Prior to error, we streamed {childCount} objects (size: {mbWritten} MB)'
|
|
)
|
|
})
|
|
} else {
|
|
pipeline(
|
|
speckleObjStream,
|
|
gzipStream,
|
|
new PassThrough({ highWaterMark: 16384 * 31 }),
|
|
res,
|
|
(err) => {
|
|
if (err) {
|
|
switch (err.code) {
|
|
case 'ERR_STREAM_PREMATURE_CLOSE':
|
|
req.log.info({ err }, 'Stream to client has prematurely closed')
|
|
break
|
|
default:
|
|
req.log.error(err, 'App error streaming objects')
|
|
break
|
|
}
|
|
return
|
|
}
|
|
|
|
req.log.info(
|
|
{
|
|
childCount: childrenList.length,
|
|
mbWritten: gzipStream.bytesWritten / 1000000
|
|
},
|
|
'Streamed {childCount} objects (size: {mbWritten} MB)'
|
|
)
|
|
}
|
|
)
|
|
}
|
|
|
|
const cSize = 1000
|
|
try {
|
|
for (let cStart = 0; cStart < childrenList.length; cStart += cSize) {
|
|
if (!speckleObjStream.writable) break
|
|
const childrenChunk = childrenList.slice(cStart, cStart + cSize)
|
|
|
|
const dbStream = await getObjectsStream({
|
|
streamId: req.params.streamId,
|
|
objectIds: childrenChunk
|
|
})
|
|
// https://knexjs.org/faq/recipes.html#manually-closing-streams
|
|
// https://github.com/knex/knex/issues/2324
|
|
req.on('close', () => {
|
|
dbStream.end.bind(dbStream)
|
|
dbStream.destroy.bind(dbStream)
|
|
})
|
|
|
|
await new Promise((resolve, reject) => {
|
|
if (FF_OBJECTS_STREAMING_FIX) {
|
|
dbStream.pipe(chainPipeline, { end: false })
|
|
} else {
|
|
dbStream.pipe(speckleObjStream, { end: false })
|
|
}
|
|
dbStream.once('end', resolve)
|
|
dbStream.once('error', reject)
|
|
})
|
|
}
|
|
} catch (ex) {
|
|
req.log.error(ex, `DB Error streaming objects`)
|
|
speckleObjStream.emit('error', new DatabaseError('Database streaming error'))
|
|
} finally {
|
|
speckleObjStream.end()
|
|
}
|
|
})
|
|
}
|