96bed71022
* Improves error logging
- use pino error logger correctly by passing in error as first argument
* monitor deployment: Filter logging at INFO level and above
* Use structured logging to create parameters for monitoring results
* Add structured logging to obj fileimport service
* Fileimport service, fix and improve logging
- use child logger with additional context where possible
- select appropriate logging level
- fix duplicated context in log statement
* REST endpoints, add context to structured logging and remove same context from message
* Webhook service provides context to bound logger to properly use structured logging
- Pass bound logger containing context to `makeNetworkRequest`
- do not log url, as it may contain a secret (like Discord's webhook urls), instead log the webhook Id
- log error message when network call fails
* upload: make better use of structured logging when recording data
* pino-pretty when in dev or test mode
- pino-pretty configured to send to stderr
* LOG_PRETTY env var
* Silence structured logging during testing
- can not rely on determining the port number by reading from stdout/stderr
- instead we determine which port is free, then create our server on that port
- we then poll that port until the server is ready before commencing tests
* Allow puppeteer to install chromium
* Do not need to install chromium separately
230 lines
6.9 KiB
JavaScript
230 lines
6.9 KiB
JavaScript
'use strict'
|
|
const zlib = require('zlib')
|
|
const cors = require('cors')
|
|
const Busboy = require('busboy')
|
|
|
|
const { validatePermissionsWriteStream } = require('./authUtils')
|
|
|
|
const { createObjectsBatched } = require('../services/objects')
|
|
const { uploadEndpointLogger } = require('@/logging/logging')
|
|
|
|
const MAX_FILE_SIZE = 50 * 1024 * 1024
|
|
|
|
module.exports = (app) => {
|
|
app.options('/objects/:streamId', cors())
|
|
|
|
app.post('/objects/:streamId', cors(), async (req, res) => {
|
|
const boundLogger = uploadEndpointLogger.child({
|
|
user: req.context.userId || '-',
|
|
streamId: req.params.streamId
|
|
})
|
|
|
|
const hasStreamAccess = await validatePermissionsWriteStream(
|
|
req.params.streamId,
|
|
req
|
|
)
|
|
if (!hasStreamAccess.result) {
|
|
return res.status(hasStreamAccess.status).end()
|
|
}
|
|
|
|
const busboy = Busboy({ headers: req.headers })
|
|
let totalProcessed = 0
|
|
// let last = {}
|
|
|
|
const promises = []
|
|
let requestDropped = false
|
|
|
|
busboy.on('file', (name, file, info) => {
|
|
const { mimeType } = info
|
|
|
|
if (requestDropped) return
|
|
|
|
if (mimeType === 'application/gzip') {
|
|
const buffer = []
|
|
|
|
file.on('data', (data) => {
|
|
if (data) buffer.push(data)
|
|
})
|
|
|
|
file.on('end', async () => {
|
|
if (requestDropped) return
|
|
const t0 = Date.now()
|
|
let objs = []
|
|
|
|
const gzippedBuffer = Buffer.concat(buffer)
|
|
if (gzippedBuffer.length > MAX_FILE_SIZE) {
|
|
boundLogger.error(
|
|
`Upload error: Batch size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
`File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
requestDropped = true
|
|
}
|
|
|
|
const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString()
|
|
if (gunzippedBuffer.length > MAX_FILE_SIZE) {
|
|
boundLogger.error(
|
|
`Upload error: Batch size too large (${gunzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
`File size too large (${gunzippedBuffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
requestDropped = true
|
|
}
|
|
|
|
try {
|
|
objs = JSON.parse(gunzippedBuffer)
|
|
} catch (e) {
|
|
boundLogger.error(`Upload error: Batch not in JSON format`)
|
|
if (!requestDropped) res.status(400).send('Failed to parse data.')
|
|
requestDropped = true
|
|
}
|
|
|
|
// last = objs[objs.length - 1]
|
|
totalProcessed += objs.length
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
|
boundLogger.error(e, `Upload error.`)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Error inserting object in the database. Check server logs for details'
|
|
)
|
|
requestDropped = true
|
|
})
|
|
promises.push(promise)
|
|
|
|
await promise
|
|
|
|
boundLogger.info(
|
|
{
|
|
durationSeconds: (Date.now() - t0) / 1000,
|
|
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
|
|
uploadedSizeMB: gunzippedBuffer.length / 1000000,
|
|
requestDropped
|
|
},
|
|
`Uploaded batch of ${objs.length} objects`
|
|
)
|
|
})
|
|
} else if (
|
|
mimeType === 'text/plain' ||
|
|
mimeType === 'application/json' ||
|
|
mimeType === 'application/octet-stream'
|
|
) {
|
|
let buffer = ''
|
|
|
|
file.on('data', (data) => {
|
|
if (data) buffer += data
|
|
})
|
|
|
|
file.on('end', async () => {
|
|
if (requestDropped) return
|
|
const t0 = Date.now()
|
|
let objs = []
|
|
|
|
if (buffer.length > MAX_FILE_SIZE) {
|
|
boundLogger.error(
|
|
`Upload error: Batch size too large (${buffer.length} > ${MAX_FILE_SIZE})`
|
|
)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`)
|
|
requestDropped = true
|
|
}
|
|
|
|
try {
|
|
objs = JSON.parse(buffer)
|
|
} catch (e) {
|
|
boundLogger.error(`Upload error: Batch not in JSON format`)
|
|
if (!requestDropped) res.status(400).send('Failed to parse data.')
|
|
requestDropped = true
|
|
}
|
|
|
|
totalProcessed += objs.length
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
|
boundLogger.error(e, `Upload error.`)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Error inserting object in the database. Check server logs for details'
|
|
)
|
|
requestDropped = true
|
|
})
|
|
promises.push(promise)
|
|
|
|
await promise
|
|
boundLogger.info(
|
|
{
|
|
uploadedSizeMB: buffer.length / 1000000,
|
|
durationSeconds: (Date.now() - t0) / 1000,
|
|
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024,
|
|
requestDropped
|
|
},
|
|
`Uploaded batch of ${objs.length} objects.`
|
|
)
|
|
})
|
|
} else {
|
|
boundLogger.error(`Invalid ContentType header: ${mimeType}`)
|
|
if (!requestDropped)
|
|
res
|
|
.status(400)
|
|
.send(
|
|
'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".'
|
|
)
|
|
requestDropped = true
|
|
}
|
|
})
|
|
|
|
busboy.on('finish', async () => {
|
|
if (requestDropped) return
|
|
|
|
boundLogger.info(
|
|
{
|
|
crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024
|
|
},
|
|
`Upload finished: ${totalProcessed} objs`
|
|
)
|
|
|
|
let previouslyAwaitedPromises = 0
|
|
while (previouslyAwaitedPromises !== promises.length) {
|
|
previouslyAwaitedPromises = promises.length
|
|
await Promise.all(promises)
|
|
}
|
|
|
|
res.status(201).end()
|
|
})
|
|
|
|
busboy.on('error', async (err) => {
|
|
boundLogger.info(`Upload error: ${err}`)
|
|
if (!requestDropped)
|
|
res.status(400).end('Upload request error. The server logs have more details')
|
|
requestDropped = true
|
|
})
|
|
|
|
req.pipe(busboy)
|
|
})
|
|
}
|