Files
speckle-server/packages/server/modules/core/rest/upload.ts
T
2025-09-05 13:24:56 +02:00

432 lines
16 KiB
TypeScript

import zlib from 'zlib'
import { corsMiddlewareFactory } from '@/modules/core/configs/cors'
import Busboy from 'busboy'
import { maximumObjectUploadFileSizeMb } from '@/modules/shared/helpers/envHelper'
import { ObjectHandlingError } from '@/modules/core/errors/object'
import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking'
import { toMegabytesWith1DecimalPlace } from '@/modules/core/utils/formatting'
import type { Router } from 'express'
import { createObjectsBatchedAndNoClosuresFactory } from '@/modules/core/services/objects/management'
import { storeObjectsIfNotFoundFactory } from '@/modules/core/repositories/objects'
import { validatePermissionsWriteStreamFactory } from '@/modules/core/services/streams/auth'
import { authorizeResolver, validateScopes } from '@/modules/shared'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import type { ExecuteHooks } from '@/modules/core/hooks'
import { logWithErr } from '@/observability/utils/logLevels'
const MAX_FILE_SIZE = maximumObjectUploadFileSizeMb() * 1024 * 1024
export default (app: Router, { executeHooks }: { executeHooks: ExecuteHooks }) => {
const validatePermissionsWriteStream = validatePermissionsWriteStreamFactory({
validateScopes,
authorizeResolver
})
app.options('/objects/:streamId', corsMiddlewareFactory())
app.post('/objects/:streamId', corsMiddlewareFactory(), async (req, res) => {
const calculateLogMetadata = (params: {
batchSizeMb: number
start: number
batchStartTime: number
totalObjectsProcessed: number
}) => {
return {
batchSizeMb: params.batchSizeMb,
maxFileSizeMb: toMegabytesWith1DecimalPlace(MAX_FILE_SIZE),
elapsedTimeMs: Date.now() - params.start,
batchElapsedTimeMs: Date.now() - params.batchStartTime,
totalObjectsProcessed: params.totalObjectsProcessed
}
}
req.log = req.log.child({
userId: req.context.userId || '-',
streamId: req.params.streamId
})
const start = Date.now()
const hasStreamAccess = await validatePermissionsWriteStream(
req.params.streamId,
req
)
if (!hasStreamAccess.result) {
return res.status(hasStreamAccess.status).end()
}
await executeHooks('onCreateObjectRequest', {
projectId: req.params.streamId
})
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const objectInsertionService = createObjectsBatchedAndNoClosuresFactory({
storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({
db: projectDb
})
})
let busboy
try {
busboy = Busboy({ headers: req.headers })
} catch (e) {
req.log.info(
{ err: e },
'Failed to parse request headers and body content as valid multipart/form-data.'
)
return res
.status(400)
.send(
'Failed to parse request headers and body content as valid multipart/form-data.'
)
}
let totalObjectsProcessed = 0
const promises: Promise<boolean | void | string[]>[] = []
let requestDropped = false
busboy.on('file', (name, file, info) => {
const { mimeType } = info
if (requestDropped) return
if (mimeType === 'application/gzip') {
const buffer: Uint8Array[] = []
file.on('data', (data) => {
if (data) buffer.push(data)
})
file.on('end', async () => {
req.log.debug(
`File upload of the multipart form has reached an end of file (EOF) boundary. The mimetype of the file is '${mimeType}'.`
)
if (requestDropped) return
const batchStartTime = Date.now()
let objs = []
const gzippedBuffer = Buffer.concat(buffer)
if (gzippedBuffer.length > MAX_FILE_SIZE) {
req.log.info(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(gzippedBuffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(413)
.send(
`File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
const gunzippedBuffer = zlib
.gunzipSync(new Uint8Array(gzippedBuffer))
.toString()
const gunzippedBufferMegabyteSize =
estimateStringMegabyteSize(gunzippedBuffer)
if (gunzippedBufferMegabyteSize > MAX_FILE_SIZE) {
req.log.info(
calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(413)
.send(
`File size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})`
)
requestDropped = true
}
try {
objs = JSON.parse(gunzippedBuffer)
} catch (e) {
req.log.info(
{
...calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
err: e
},
'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch of objects took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped) res.status(400).send('Failed to parse data.')
requestDropped = true
}
// last = objs[objs.length - 1]
totalObjectsProcessed += objs.length
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService({
streamId: req.params.streamId,
objects: objs,
logger: req.log
}).catch((e) => {
logWithErr(
req.log,
e,
{
...calculateLogMetadata({
batchSizeMb: gunzippedBufferMegabyteSize,
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length
},
`Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.`
)
if (!requestDropped) {
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database: ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
}
requestDropped = true
})
promises.push(promise)
await promise
req.log.info(
{
objectCount: objs.length,
elapsedTimeMs: Date.now() - start,
batchElapsedTimeMs: Date.now() - batchStartTime,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
uploadedSizeMB: toMegabytesWith1DecimalPlace(gunzippedBuffer.length),
requestDropped,
totalObjectsProcessed
},
'Uploaded batch of {objectCount} objects in {batchElapsedTimeMs}ms. Total objects processed so far: {totalObjectsProcessed} in a total of {elapsedTimeMs}ms.'
)
})
} else if (
mimeType === 'text/plain' ||
mimeType === 'application/json' ||
mimeType === 'application/octet-stream'
) {
let buffer = ''
file.on('data', (data) => {
if (data) buffer += data
})
file.on('end', async () => {
if (requestDropped) return
const batchStartTime = Date.now()
let objs = []
if (buffer.length > MAX_FILE_SIZE) {
req.log.info(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(413)
.send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`)
requestDropped = true
}
try {
objs = JSON.parse(buffer)
} catch (e) {
req.log.info(
{
...calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
err: e
},
'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res.status(400).send('Failed to parse data. Batch is not in JSON format.')
requestDropped = true
}
if (!Array.isArray(objs)) {
req.log.info(
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
'Upload error: Batch not an array. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
'Failed to parse data. Batch is expected to be wrapped in a JSON array.'
)
requestDropped = true
}
//FIXME should we exit here if requestDropped is true
totalObjectsProcessed += objs.length
req.log.debug(
{
...calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length
},
'Total objects, including current pending batch of {objectCount} objects, processed so far is {totalObjectsProcessed}. This batch has taken {batchElapsedTimeMs}ms. Total time elapsed is {elapsedTimeMs}ms.'
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
const promise = objectInsertionService({
streamId: req.params.streamId,
objects: objs,
logger: req.log
}).catch((e) => {
logWithErr(
req.log,
e,
calculateLogMetadata({
batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length),
start,
batchStartTime,
totalObjectsProcessed
}),
`Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.`
)
if (!requestDropped)
switch (e.constructor) {
case ObjectHandlingError:
res
.status(400)
.send(`Error inserting object in the database. ${e.message}`)
break
default:
res
.status(400)
.send(
'Error inserting object in the database. Check server logs for details'
)
}
requestDropped = true
})
promises.push(promise)
await promise
req.log.info(
{
...calculateLogMetadata({
batchSizeMb: estimateStringMegabyteSize(buffer),
start,
batchStartTime,
totalObjectsProcessed
}),
objectCount: objs.length,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
},
'Uploaded batch of {objectCount} objects. Total number of objects processed is {totalObjectsProcessed}. This batch took {batchElapsedTimeMs}ms.'
)
})
} else {
req.log.info(
{
mimeType,
totalObjectsProcessed
},
'Invalid ContentType header: {mimeType}. Total number of objects processed so far: {totalObjectsProcessed}.'
)
if (!requestDropped)
res
.status(400)
.send(
'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".'
)
requestDropped = true
}
})
busboy.on('finish', async () => {
if (requestDropped) return
req.log.info(
{
totalObjectsProcessed,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
elapsedTimeMs: Date.now() - start
},
'Upload finished: {totalObjectsProcessed} objects processed in {elapsedTimeMs}ms'
)
let previouslyAwaitedPromises = 0
while (previouslyAwaitedPromises !== promises.length) {
previouslyAwaitedPromises = promises.length
await Promise.all(promises)
}
res.status(201).end()
})
busboy.on('error', async (err) => {
req.log.info(
{
err,
totalObjectsProcessed,
elapsedTimeMs: Date.now() - start,
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
},
'Error during upload. Error occurred after {elapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}. Error: {error}'
)
if (!requestDropped)
res
.status(400)
.end('{"error": "Upload request error. The server logs have more details."}')
requestDropped = true
})
req.pipe(busboy)
})
}