diff --git a/packages/server/modules/blobstorage/domain/types.ts b/packages/server/modules/blobstorage/domain/types.ts index 484498783..9a6c4a557 100644 --- a/packages/server/modules/blobstorage/domain/types.ts +++ b/packages/server/modules/blobstorage/domain/types.ts @@ -19,3 +19,15 @@ export type BlobStorageItemInput = SetOptional< BlobStorageItem, 'fileSize' | 'fileType' | 'uploadStatus' | 'uploadError' | 'createdAt' | 'fileHash' > + +export type UploadResult = ProcessingResult & { + formKey: string +} + +export type ProcessingResult = { + uploadStatus?: number + uploadError?: Nullable + blobId: string + fileName: string + fileSize: Nullable +} diff --git a/packages/server/modules/blobstorage/rest/router.ts b/packages/server/modules/blobstorage/rest/router.ts index 88801b282..66d1f02b1 100644 --- a/packages/server/modules/blobstorage/rest/router.ts +++ b/packages/server/modules/blobstorage/rest/router.ts @@ -70,7 +70,7 @@ export const blobStorageRouterFactory = (): Router => { streamId, userId, logger: req.log, - onFinishAllFileUploads: (uploadResults) => { + onFinishAllFileUploads: async (uploadResults) => { res.status(201).send({ uploadResults }) }, onError: () => { diff --git a/packages/server/modules/blobstorage/services/management.ts b/packages/server/modules/blobstorage/services/management.ts index bf9499ab0..948883df7 100644 --- a/packages/server/modules/blobstorage/services/management.ts +++ b/packages/server/modules/blobstorage/services/management.ts @@ -1,4 +1,4 @@ -import { +import type { DeleteBlob, DeleteBlobAndAssociatedObject, GetBlobMetadata, @@ -7,11 +7,12 @@ import { UploadFileStream, UpsertBlob } from '@/modules/blobstorage/domain/operations' -import { BlobStorageItem } from '@/modules/blobstorage/domain/types' +import type { BlobStorageItem } from '@/modules/blobstorage/domain/types' import { getObjectKey } from '@/modules/blobstorage/helpers/blobs' import { BadRequestError } from '@/modules/shared/errors' import { getFileSizeLimitMB } from '@/modules/shared/helpers/envHelper' -import { MaybeAsync } from '@speckle/shared' +import type { MaybeAsync } from '@speckle/shared' +import type { ProcessingResult } from '@/modules/blobstorage/domain/types' /** * File size limit in bytes @@ -90,7 +91,12 @@ const updateBlobMetadataFactory = }) const updateData = await updateCallback({ objectKey: objectKey! }) await deps.updateBlob({ id: blobId, item: updateData, streamId }) - return { blobId, fileName, ...updateData } + return { + blobId, + ...updateData, + fileName: updateData.fileName ?? fileName, // ensure the fileName is not undefined + fileSize: updateData.fileSize ?? null // ensure the fileSize is not undefined + } } export const markUploadSuccessFactory = @@ -99,7 +105,7 @@ export const markUploadSuccessFactory = getObjectAttributes: (params: ObjectKeyPayload) => MaybeAsync<{ fileSize: number }>, streamId: string, blobId: string - ) => { + ): Promise => { const updateBlobMetadata = updateBlobMetadataFactory(deps) return await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => { const { fileSize } = await getObjectAttributes({ objectKey }) @@ -117,7 +123,7 @@ export const markUploadErrorFactory = streamId: string, blobId: string, error: string - ) => { + ): Promise => { const updateBlobMetadata = updateBlobMetadataFactory(deps) return await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => { await deleteObject({ objectKey }) @@ -127,7 +133,11 @@ export const markUploadErrorFactory = export const markUploadOverFileSizeLimitFactory = (deps: UpdateBlobMetadataDeps) => - async (deleteObject: DeleteObjectFromStorage, streamId: string, blobId: string) => { + async ( + deleteObject: DeleteObjectFromStorage, + streamId: string, + blobId: string + ): Promise => { const markUploadError = markUploadErrorFactory(deps) return await markUploadError( deleteObject, diff --git a/packages/server/modules/blobstorage/services/streams.ts b/packages/server/modules/blobstorage/services/streams.ts index b226b2b70..e63a07ea2 100644 --- a/packages/server/modules/blobstorage/services/streams.ts +++ b/packages/server/modules/blobstorage/services/streams.ts @@ -15,28 +15,24 @@ import { getObjectAttributesFactory, storeFileStreamFactory } from '@/modules/blobstorage/repositories/blobs' -import { ensureError } from '@speckle/shared' +import { ensureError, Nullable } from '@speckle/shared' import { getProjectObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import type { Logger } from '@/observability/logging' import type { Writable } from 'stream' import { get } from 'lodash' +import { UploadResult } from '@/modules/blobstorage/domain/types' +import { ProcessingResult } from '@/modules/blobstorage/domain/types' type NewFileStreamProcessor = (params: { writeable: Writable streamId: string userId: string - onFinishAllFileUploads: (results: UploadResult[]) => void + onFinishAllFileUploads: (results: Array) => Promise onError: (err: unknown) => void logger: Logger }) => Promise -type UploadResult = { - uploadStatus?: number - uploadError?: Error | null | string - formKey: string -} - export const processNewFileStreamFactory = (): NewFileStreamProcessor => { return async (params) => { const { writeable, streamId, userId, onFinishAllFileUploads, onError } = params @@ -46,6 +42,9 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => { uploadStatus?: number uploadError?: Error | null | string formKey: string + blobId: string + fileName: string + fileSize: Nullable }>[] = [] const [projectDb, projectStorage] = await Promise.all([ @@ -82,12 +81,7 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => { const { filename: fileName } = info const fileType = fileName?.split('.')?.pop()?.toLowerCase() logger = logger.child({ fileName, fileType }) - const registerUploadResult = ( - processingPromise: Promise<{ - uploadStatus?: number - uploadError?: Error | null | string - }> - ) => { + const registerUploadResult = (processingPromise: Promise) => { finalizePromises.push( processingPromise.then((resultItem) => ({ ...resultItem, formKey })) ) @@ -141,7 +135,7 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => { await Promise.all(Object.values(uploadOperations)) // have to make sure all finalize promises have been awaited const uploadResults = await Promise.all(finalizePromises) - onFinishAllFileUploads(uploadResults) + await onFinishAllFileUploads(uploadResults) return }) diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index ba81c07df..5ca76b596 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -1,28 +1,18 @@ /* istanbul ignore file */ -import { insertNewUploadAndNotifyFactory } from '@/modules/fileuploads/services/management' -import request from 'request' -import { authMiddlewareCreator } from '@/modules/shared/middleware' import { moduleLogger } from '@/observability/logging' import { onFileImportProcessedFactory, - onFileProcessingFactory, parseMessagePayload } from '@/modules/fileuploads/services/resultListener' -import { - getFileInfoFactory, - saveUploadFileFactory -} from '@/modules/fileuploads/repositories/fileUploads' -import { db } from '@/db/knex' -import { publish } from '@/modules/shared/utils/subscriptions' +import { getFileInfoFactory } from '@/modules/fileuploads/repositories/fileUploads' +import { FileImportSubscriptions, publish } from '@/modules/shared/utils/subscriptions' import { SpeckleModule } from '@/modules/shared/helpers/typeHelper' -import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz' -import { getRolesFactory } from '@/modules/shared/repositories/roles' import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches' -import { getStreamFactory } from '@/modules/core/repositories/streams' -import { getPort, isFileUploadsEnabled } from '@/modules/shared/helpers/envHelper' +import { isFileUploadsEnabled } from '@/modules/shared/helpers/envHelper' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { listenFor } from '@/modules/core/utils/dbNotificationListener' import { getEventBus } from '@/modules/shared/services/eventBus' +import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router' export const init: SpeckleModule['init'] = async (app, isInitial) => { if (!isFileUploadsEnabled()) { @@ -31,99 +21,10 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => { } moduleLogger.info('📄 Init FileUploads module') - app.post( - '/api/file/:fileType/:streamId/:branchName?', - async (req, res, next) => { - await authMiddlewareCreator( - streamWritePermissionsPipelineFactory({ - getRoles: getRolesFactory({ db }), - getStream: getStreamFactory({ db }) - }) - )(req, res, next) - }, - async (req, res) => { - const branchName = req.params.branchName || 'main' - req.log = req.log.child({ - streamId: req.params.streamId, - userId: req.context.userId, - branchName - }) - - const projectDb = await getProjectDbClient({ projectId: req.params.streamId }) - const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({ - getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), - saveUploadFile: saveUploadFileFactory({ db: projectDb }), - publish - }) - const saveFileUploads = async ({ - userId, - streamId, - branchName, - uploadResults - }: { - userId: string - streamId: string - branchName: string - uploadResults: Array<{ - blobId: string - fileName: string - fileSize: number - }> - }) => { - await Promise.all( - uploadResults.map(async (upload) => { - await insertNewUploadAndNotify({ - fileId: upload.blobId, - streamId, - branchName, - userId, - fileName: upload.fileName, - fileType: upload.fileName.split('.').pop()!, - fileSize: upload.fileSize - }) - }) - ) - } - //TODO refactor packages/server/modules/blobstorage/index.ts to use the service pattern, and then refactor this to call the service directly from here without the http overhead - const pipedReq = request( - // we call this same server on localhost (IPv4) to upload the blob and do not make an external call - `http://127.0.0.1:${getPort()}/api/stream/${req.params.streamId}/blob`, - async (err, response, body) => { - if (err) { - res.log.error(err, 'Error while uploading blob.') - res.status(500).send(err.message) - return - } - if (response.statusCode === 201) { - const { uploadResults } = JSON.parse(body) - await saveFileUploads({ - userId: req.context.userId!, - streamId: req.params.streamId, - branchName, - uploadResults - }) - } else { - res.log.error( - { - statusCode: response.statusCode, - path: `http://127.0.0.1:${getPort()}/api/stream/${ - req.params.streamId - }/blob` - }, - 'Error while uploading file.' - ) - } - res.contentType('application/json') - res.status(response.statusCode).send(body) - } - ) - - req.pipe(pipedReq as unknown as NodeJS.WritableStream) - } - ) + app.use(fileuploadRouterFactory()) if (isInitial) { - listenFor('file_import_update', async (msg) => { + listenFor(FileImportSubscriptions.ProjectFileImportUpdated, async (msg) => { const parsedMessage = parseMessagePayload(msg.payload) if (!parsedMessage.streamId) return const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId }) @@ -134,14 +35,5 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => { eventEmit: getEventBus().emit })(parsedMessage) }) - listenFor('file_import_started', async (msg) => { - const parsedMessage = parseMessagePayload(msg.payload) - if (!parsedMessage.streamId) return - const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId }) - await onFileProcessingFactory({ - getFileInfo: getFileInfoFactory({ db: projectDb }), - publish - })(parsedMessage) - }) } } diff --git a/packages/server/modules/fileuploads/rest/router.ts b/packages/server/modules/fileuploads/rest/router.ts new file mode 100644 index 000000000..294c2fd4f --- /dev/null +++ b/packages/server/modules/fileuploads/rest/router.ts @@ -0,0 +1,111 @@ +import { Router } from 'express' +import { insertNewUploadAndNotifyFactory } from '@/modules/fileuploads/services/management' +import { authMiddlewareCreator } from '@/modules/shared/middleware' +import { saveUploadFileFactory } from '@/modules/fileuploads/repositories/fileUploads' +import { db } from '@/db/knex' +import { publish } from '@/modules/shared/utils/subscriptions' +import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz' +import { getRolesFactory } from '@/modules/shared/repositories/roles' +import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches' +import { getStreamFactory } from '@/modules/core/repositories/streams' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import { createBusboy } from '@/modules/blobstorage/rest/busboy' +import { processNewFileStreamFactory } from '@/modules/blobstorage/services/streams' +import { UnauthorizedError } from '@/modules/shared/errors' +import { Nullable } from '@speckle/shared' + +export const fileuploadRouterFactory = (): Router => { + const processNewFileStream = processNewFileStreamFactory() + + const app = Router() + + app.post( + '/api/file/:fileType/:streamId/:branchName?', + async (req, res, next) => { + await authMiddlewareCreator( + streamWritePermissionsPipelineFactory({ + getRoles: getRolesFactory({ db }), + getStream: getStreamFactory({ db }) + }) + )(req, res, next) + }, + async (req, res) => { + const branchName = req.params.branchName || 'main' + const streamId = req.params.streamId + const userId = req.context.userId + if (!userId) { + throw new UnauthorizedError('User not authenticated.') + } + const logger = req.log.child({ + streamId, + userId, + branchName + }) + + const projectDb = await getProjectDbClient({ projectId: streamId }) + const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({ + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), + saveUploadFile: saveUploadFileFactory({ db: projectDb }), + publish + }) + const saveFileUploads = async ({ + userId, + streamId, + branchName, + uploadResults + }: { + userId: string + streamId: string + branchName: string + uploadResults: Array<{ + blobId: string + fileName: string + fileSize: Nullable + }> + }) => { + await Promise.all( + uploadResults.map(async (upload) => { + await insertNewUploadAndNotify({ + fileId: upload.blobId, + streamId, + branchName, + userId, + fileName: upload.fileName, + fileType: upload.fileName?.split('.').pop() || '', //FIXME + fileSize: upload.fileSize + }) + }) + ) + } + + const busboy = createBusboy(req) + const newFileStreamProcessor = await processNewFileStream({ + writeable: busboy, + streamId, + userId, + logger, + onFinishAllFileUploads: async (uploadResults) => { + await saveFileUploads({ + userId: req.context.userId!, + streamId: req.params.streamId, + branchName, + uploadResults + }) + res.status(201).send({ uploadResults }) + }, + onError: () => { + res.contentType('application/json') + res + .status(400) + .end( + '{ "error": "Upload request error. The server logs may have more details." }' + ) + } + }) + + req.pipe(newFileStreamProcessor) + } + ) + + return app +}