fix(server/api/getobjects): handle closing of client stream (#2669)
This commit is contained in:
+37
-20
@@ -1,14 +1,12 @@
|
||||
'use strict'
|
||||
const zlib = require('zlib')
|
||||
const { corsMiddleware } = require('@/modules/core/configs/cors')
|
||||
import zlib from 'zlib'
|
||||
import { corsMiddleware } from '@/modules/core/configs/cors'
|
||||
import type { Application } from 'express'
|
||||
import { validatePermissionsReadStream } from '@/modules/core/rest/authUtils'
|
||||
import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream'
|
||||
import { getObjectsStream } from '@/modules/core/services/objects'
|
||||
import { pipeline, PassThrough } from 'stream'
|
||||
|
||||
const { validatePermissionsReadStream } = require('./authUtils')
|
||||
const { SpeckleObjectsStream } = require('./speckleObjectsStream')
|
||||
const { getObjectsStream } = require('../services/objects')
|
||||
|
||||
const { pipeline, PassThrough } = require('stream')
|
||||
|
||||
module.exports = (app) => {
|
||||
export default (app: Application) => {
|
||||
app.options('/api/getobjects/:streamId', corsMiddleware())
|
||||
|
||||
app.post('/api/getobjects/:streamId', corsMiddleware(), async (req, res) => {
|
||||
@@ -43,38 +41,57 @@ module.exports = (app) => {
|
||||
res,
|
||||
(err) => {
|
||||
if (err) {
|
||||
req.log.error(err, `App error streaming objects`)
|
||||
} else {
|
||||
req.log.info(
|
||||
{
|
||||
childCount: childrenList.length,
|
||||
mbWritten: gzipStream.bytesWritten / 1000000
|
||||
},
|
||||
'Streamed {childCount} objects (size: {mbWritten} MB)'
|
||||
)
|
||||
switch (err.code) {
|
||||
case 'ERR_STREAM_PREMATURE_CLOSE':
|
||||
req.log.debug({ err }, 'Stream to client has prematurely closed')
|
||||
break
|
||||
default:
|
||||
req.log.error(err, 'App error streaming objects')
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
req.log.info(
|
||||
{
|
||||
childCount: childrenList.length,
|
||||
mbWritten: gzipStream.bytesWritten / 1000000
|
||||
},
|
||||
'Streamed {childCount} objects (size: {mbWritten} MB)'
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
const cSize = 1000
|
||||
try {
|
||||
for (let cStart = 0; cStart < childrenList.length; cStart += cSize) {
|
||||
if (!speckleObjStream.writable) break
|
||||
const childrenChunk = childrenList.slice(cStart, cStart + cSize)
|
||||
|
||||
const dbStream = await getObjectsStream({
|
||||
streamId: req.params.streamId,
|
||||
objectIds: childrenChunk
|
||||
})
|
||||
|
||||
const speckleObjStreamCloseHandler = () => {
|
||||
dbStream.destroy()
|
||||
}
|
||||
|
||||
speckleObjStream.once('close', speckleObjStreamCloseHandler)
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
dbStream.pipe(speckleObjStream, { end: false })
|
||||
dbStream.once('end', resolve)
|
||||
dbStream.once('error', reject)
|
||||
})
|
||||
|
||||
speckleObjStream.removeListener('close', speckleObjStreamCloseHandler)
|
||||
}
|
||||
} catch (ex) {
|
||||
req.log.error(ex, `DB Error streaming objects`)
|
||||
speckleObjStream.emit('error', new Error('Database streaming error'))
|
||||
} finally {
|
||||
speckleObjStream.end()
|
||||
}
|
||||
speckleObjStream.end()
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user