3d6653f73b
* Update to new specklepy (#1173) * Publish images for all branches but limit tagging * only tag 'latest' and '2' when 'SHOULD_PUBLISH' variable is 'true' * Publishing helm chart should check for `SHOULD_PUBLISH` * Move blocking step to publish-helm chart, and allow images to be published * Pin python requirements and bump to latest versions * Fix EOL whitespace * use valid version for psycopg2-binary (the clue is in the 2!) * fix(fileimports): add exception printing to file imports * fix(fileimports): bump specklepy version move to a specklepy version that contains a fix for send without writing to disk Co-authored-by: Iain Sproat <68657+iainsproat@users.noreply.github.com> * Fixes liveness and readiness checks to prevent CSRF error message (#1169) - provides content-type header - check that status code is 200 * Fixes broken helm template by adding quotation marks around liveness probe command (#1171) * fix(server activities): make sure the stream events are properly dispatched * feat(server webhooks): add scheduled orphaned webhook cleanup * test(server webhooks): add test to webhook cleanup service * feat(server webhooks): drop foreign key reference for webhooks schema to streams * refactor(server req context): refactor req context to have the ip attribute for all requests * feat(server objects rest api): add ratelimits to objects rest api endpoints * fix(server rest api): properly handle returning 419 Co-authored-by: Iain Sproat <68657+iainsproat@users.noreply.github.com>
263 lines
8.1 KiB
JavaScript
263 lines
8.1 KiB
JavaScript
'use strict'
|
|
const zlib = require('zlib')
|
|
const cors = require('cors')
|
|
const Busboy = require('busboy')
|
|
const debug = require('debug')
|
|
|
|
const { contextMiddleware } = require('@/modules/shared')
|
|
const { validatePermissionsWriteStream } = require('./authUtils')
|
|
|
|
const { createObjectsBatched } = require('../services/objects')
|
|
const {
|
|
rejectsRequestWithRatelimitStatusIfNeeded
|
|
} = require('@/modules/core/services/ratelimits')
|
|
|
|
const MAX_FILE_SIZE = 50 * 1024 * 1024
|
|
|
|
module.exports = (app) => {
|
|
app.options('/objects/:streamId', cors())
|
|
|
|
app.post('/objects/:streamId', cors(), contextMiddleware, async (req, res) => {
|
|
const rejected = await rejectsRequestWithRatelimitStatusIfNeeded({
|
|
action: 'POST /objects/:streamId',
|
|
req,
|
|
res
|
|
})
|
|
if (rejected) return rejected
|
|
|
|
const hasStreamAccess = await validatePermissionsWriteStream(
|
|
req.params.streamId,
|
|
req
|
|
)
|
|
if (!hasStreamAccess.result) {
|
|
return res.status(hasStreamAccess.status).end()
|
|
}
|
|
|
|
const busboy = Busboy({ headers: req.headers })
|
|
let totalProcessed = 0
|
|
// let last = {}
|
|
|
|
const promises = []
|
|
let requestDropped = false
|
|
|
|
busboy.on('file', (name, file, info) => {
|
|
const { mimeType } = info
|
|
|
|
if (requestDropped) return
|
|
|
|
if (mimeType === 'application/gzip') {
|
|
const buffer = []
|
|
|
|
file.on('data', (data) => {
|
|
if (data) buffer.push(data)
|
|
})
|
|
|
|
file.on('end', async () => {
|
|
if (requestDropped) return
|
|
const t0 = Date.now()
|
|
let objs = []
|
|
|
|
const gzippedBuffer = Buffer.concat(buffer)
|
|
if (gzippedBuffer.length > MAX_FILE_SIZE) {
|
|
debug('speckle:error')(
|
|
`[User ${
|
|
req.context.userId || '-'
|
|
}] Upload error: Batch size too large (${
|
|
gzippedBuffer.length
|
|
} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
`File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
requestDropped = true
|
|
}
|
|
|
|
const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString()
|
|
if (gunzippedBuffer.length > MAX_FILE_SIZE) {
|
|
debug('speckle:error')(
|
|
`[User ${
|
|
req.context.userId || '-'
|
|
}] Upload error: Batch size too large (${
|
|
gunzippedBuffer.length
|
|
} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
`File size too large (${gunzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
requestDropped = true
|
|
}
|
|
|
|
try {
|
|
objs = JSON.parse(gunzippedBuffer)
|
|
} catch (e) {
|
|
debug('speckle:error')(
|
|
`[User ${
|
|
req.context.userId || '-'
|
|
}] Upload error: Batch not in JSON format`
|
|
)
|
|
if (!requestDropped) res.status(400).send('Failed to parse data.')
|
|
requestDropped = true
|
|
}
|
|
|
|
// last = objs[objs.length - 1]
|
|
totalProcessed += objs.length
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
|
debug('speckle:error')(
|
|
`[User ${req.context.userId || '-'}] Upload error: ${e.message}`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Error inserting object in the database. Check server logs for details'
|
|
)
|
|
requestDropped = true
|
|
})
|
|
promises.push(promise)
|
|
|
|
await promise
|
|
|
|
debug('speckle:info')(
|
|
`[User ${req.context.userId || '-'}] Uploaded batch of ${
|
|
objs.length
|
|
} objects to stream ${req.params.streamId} (size: ${
|
|
gunzippedBuffer.length / 1000000
|
|
} MB, duration: ${(Date.now() - t0) / 1000}s, crtMemUsage: ${
|
|
process.memoryUsage().heapUsed / 1024 / 1024
|
|
} MB, dropped=${requestDropped})`
|
|
)
|
|
})
|
|
} else if (
|
|
mimeType === 'text/plain' ||
|
|
mimeType === 'application/json' ||
|
|
mimeType === 'application/octet-stream'
|
|
) {
|
|
let buffer = ''
|
|
|
|
file.on('data', (data) => {
|
|
if (data) buffer += data
|
|
})
|
|
|
|
file.on('end', async () => {
|
|
if (requestDropped) return
|
|
const t0 = Date.now()
|
|
let objs = []
|
|
|
|
if (buffer.length > MAX_FILE_SIZE) {
|
|
debug('speckle:error')(
|
|
`[User ${
|
|
req.context.userId || '-'
|
|
}] Upload error: Batch size too large (${
|
|
buffer.length
|
|
} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`)
|
|
requestDropped = true
|
|
}
|
|
|
|
try {
|
|
objs = JSON.parse(buffer)
|
|
} catch (e) {
|
|
debug('speckle:error')(
|
|
`[User ${
|
|
req.context.userId || '-'
|
|
}] Upload error: Batch not in JSON format`
|
|
)
|
|
if (!requestDropped) res.status(400).send('Failed to parse data.')
|
|
requestDropped = true
|
|
}
|
|
// last = objs[objs.length - 1]
|
|
totalProcessed += objs.length
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
|
debug('speckle:error')(
|
|
`[User ${req.context.userId || '-'}] Upload error: ${e.message}`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Error inserting object in the database. Check server logs for details'
|
|
)
|
|
requestDropped = true
|
|
})
|
|
promises.push(promise)
|
|
|
|
await promise
|
|
debug('speckle:info')(
|
|
`[User ${req.context.userId || '-'}] Uploaded batch of ${
|
|
objs.length
|
|
} objects to stream ${req.params.streamId} (size: ${
|
|
buffer.length / 1000000
|
|
} MB, duration: ${(Date.now() - t0) / 1000}s, crtMemUsage: ${
|
|
process.memoryUsage().heapUsed / 1024 / 1024
|
|
} MB, dropped=${requestDropped})`
|
|
)
|
|
})
|
|
} else {
|
|
debug('speckle:error')(
|
|
`[User ${req.context.userId || '-'}] Invalid ContentType header: ${mimeType}`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".'
|
|
)
|
|
requestDropped = true
|
|
}
|
|
})
|
|
|
|
busboy.on('finish', async () => {
|
|
if (requestDropped) return
|
|
|
|
debug('speckle:upload-endpoint')(
|
|
`[User ${req.context.userId || '-'}] Upload finished: ${totalProcessed} objs, ${
|
|
process.memoryUsage().heapUsed / 1024 / 1024
|
|
} MB mem`
|
|
)
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
res.status(201).end()
|
|
})
|
|
|
|
busboy.on('error', async (err) => {
|
|
debug('speckle:upload-endpoint')(
|
|
`[User ${req.context.userId || '-'}] Upload error: ${err}`
|
|
)
|
|
if (!requestDropped)
|
|
res.status(400).end('Upload request error. The server logs have more details')
|
|
requestDropped = true
|
|
})
|
|
|
|
req.pipe(busboy)
|
|
})
|
|
}
|