Files
speckle-server/packages/server/modules/blobstorage/index.js
T
Gergő Jedlicska ed458fb619 Add blob storage backend (#802)
* feat(server): add server authz pipeline rework first sketch

* feat(server authz): add new server authz middleware poc implementation

* test(server authz): add unittests for the new server authz workflow

* feat(wip rework of fileuploads vs blob storage): add basim impl of separate blob storage service

* feat(fileimport service): refactored file import service to utilize the new asssetstorage service

* refactor(server errors): refactor server errors to use the shared module definitions

Now all the errors inherit from BaseError

* refactor(fileimport service): cleanup after refactor

* feat(frontend fileimports): use the new blob storage for downloading the original file

* refactor(server fileimports): clean up the remnants of S3 storage from file imports

* refactor(server authz): centralize generic authz pipeline configs

* refactor(server blob storage): refactor / rename everything to use the `blob-storage` name

* ci(circleci): add s3 objectstorage environment variables

* ci(circleci): fix missing env variables

* ci(circleci): add minio test container

* ci(circleci): fix minio app startup

* ci(circleci): enable circleci remote docker

* ci(circleci): fix minio startup

* ci(cirleci): detach and wait properly for minio to start

* ci(circleci): revert to additional minio img config, it only fails when the container is stopped ?!

* ci(circleci): disable file uploads

* fix(fileimports): update with blob storage refactor leftovers

* feat(server blob storage): add blob storage graphql api

* refactor(server errors): merge new errors to shared module

* fix(server comments rte): fix import for RTE error

* chore(fileimports): remove node-fetch from dependency

* chore(server): remove body parser dependency

* fix(server blob storage): fix gql api

* fix(frontend): fix fileupload item not loading the new upload status, cause of premature event fire

* feat(server blob storage): fix file size limit and allow for public streams

* Update packages/server/modules/blobstorage/graph/schemas/blobstorage.graphql

Co-authored-by: Kristaps Fabians Geikins <fabis94@live.com>

* chore(blobstorage): fix PR review issues

* fix(server): fix import bugs

Co-authored-by: Kristaps Fabians Geikins <fabis94@live.com>
2022-06-16 11:31:03 +02:00

208 lines
6.1 KiB
JavaScript

const debug = require('debug')
const { contextMiddleware } = require('@/modules/shared')
const Busboy = require('busboy')
const {
authMiddlewareCreator,
streamReadPermissions,
streamWritePermissions,
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
} = require('@/modules/shared/authz')
const {
ensureStorageAccess,
storeFileStream,
getObjectStream,
deleteObject,
getObjectAttributes
} = require('@/modules/blobstorage/objectStorage')
const crs = require('crypto-random-string')
const {
uploadFileStream,
getFileStream,
markUploadError,
markUploadSuccess,
markUploadOverFileSizeLimit,
deleteBlob,
getBlobMetadata,
getBlobMetadataCollection
} = require('@/modules/blobstorage/services')
const { NotFoundError, ResourceMismatch } = require('@/modules/shared/errors')
const ensureConditions = async () => {
if (process.env.DISABLE_FILE_UPLOADS) {
debug('speckle:modules')('📦 Blob storage is DISABLED')
return
} else {
debug('speckle:modules')('📦 Init BlobStorage module')
await ensureStorageAccess()
}
if (!process.env.S3_BUCKET) {
debug('speckle:error')(
'S3_BUCKET env variable was not specified. 📦 BlobStorage will be DISABLED.'
)
return
}
}
const errorHandler = async (req, res, callback) => {
try {
await callback(req, res)
} catch (err) {
if (err instanceof NotFoundError) {
res.status(404).send({ error: err.message })
} else if (err instanceof ResourceMismatch) {
res.status(400).send({ error: err.message })
} else {
res.status(500).send({ error: err.message })
}
}
}
exports.init = async (app) => {
await ensureConditions()
// eslint-disable-next-line no-unused-vars
app.post(
'/api/stream/:streamId/blob',
contextMiddleware,
authMiddlewareCreator([
...streamWritePermissions,
// todo should we add public comments upload escape hatch?
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
]),
async (req, res) => {
// no checking of startup conditions, just dont init the endpoints if not configured right
//authorize request
const uploadOperations = {}
const finalizePromises = []
const busboy = Busboy({
headers: req.headers,
// this is 100 MB which matches the current frontend file size limit
limits: { fileSize: 104_857_600 }
})
const streamId = req.params.streamId
busboy.on('file', (name, file, info) => {
const { filename: fileName } = info
const fileType = fileName.split('.').pop().toLowerCase()
const blobId = crs({ length: 10 })
uploadOperations[blobId] = uploadFileStream(
storeFileStream,
{ streamId, userId: req.context.userId },
{ blobId, fileName, fileType, fileStream: file }
)
//this file level 'close' is fired when a single file upload finishes
//this way individual upload statuses can be updated, when done
file.on('close', async () => {
//this is handled by the file.on('limit', ...) event
if (file.truncated) return
await uploadOperations[blobId]
finalizePromises.push(
markUploadSuccess(getObjectAttributes, streamId, blobId)
)
})
file.on('limit', () => {
finalizePromises.push(
markUploadOverFileSizeLimit(deleteObject, streamId, blobId)
)
})
file.on('error', (err) => {
console.log(err)
finalizePromises.push(
markUploadError(deleteObject, blobId, 'i need some error info here')
)
})
})
busboy.on('finish', async () => {
// make sure all upload operations have been awaited,
// otherwise the finish even can fire before all async operations finish
//resulting in missing return values
await Promise.all(Object.values(uploadOperations))
// have to make sure all finalize promises have been awaited
const uploadResults = await Promise.all(finalizePromises)
res.status(201).send({ uploadResults })
})
busboy.on('error', (err) => {
debug('speckle:error')(`File upload error: ${err}`)
const status = 400
const response = 'Upload request error. The server logs have more details'
res.status(status).end(response)
})
req.pipe(busboy)
}
)
app.get(
'/api/stream/:streamId/blob/:blobId',
contextMiddleware,
authMiddlewareCreator([
...streamReadPermissions,
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
]),
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const { fileName } = await getBlobMetadata({
streamId: req.params.streamId,
blobId: req.params.blobId
})
const fileStream = await getFileStream({
getObjectStream,
streamId: req.params.streamId,
blobId: req.params.blobId
})
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename="${fileName}"`
})
fileStream.pipe(res)
})
}
)
app.delete(
'/api/stream/:streamId/blob/:blobId',
contextMiddleware,
authMiddlewareCreator(streamWritePermissions),
async (req, res) => {
errorHandler(req, res, async (req, res) => {
await deleteBlob({
streamId: req.params.streamId,
blobId: req.params.blobId,
deleteObject
})
res.status(204).send()
})
}
)
app.get(
'/api/stream/:streamId/blobs',
contextMiddleware,
authMiddlewareCreator(streamWritePermissions),
async (req, res) => {
const fileName = req.query.fileName
errorHandler(req, res, async (req, res) => {
const blobMetadataCollection = await getBlobMetadataCollection({
streamId: req.params.streamId,
query: fileName
})
res.status(200).send(blobMetadataCollection)
})
}
)
app.delete(
'/api/stream/:streamId/blobs',
contextMiddleware,
authMiddlewareCreator(streamWritePermissions)
// async (req, res) => {}
)
}
exports.finalize = () => {}