refactor fileuploads to avoid http request

This commit is contained in:
Iain Sproat
2025-03-05 16:39:23 +00:00
parent 0035783932
commit 7c6ebc95dc
6 changed files with 156 additions and 137 deletions
@@ -19,3 +19,15 @@ export type BlobStorageItemInput = SetOptional<
BlobStorageItem,
'fileSize' | 'fileType' | 'uploadStatus' | 'uploadError' | 'createdAt' | 'fileHash'
>
export type UploadResult = ProcessingResult & {
formKey: string
}
export type ProcessingResult = {
uploadStatus?: number
uploadError?: Nullable<Error | string>
blobId: string
fileName: string
fileSize: Nullable<number>
}
@@ -70,7 +70,7 @@ export const blobStorageRouterFactory = (): Router => {
streamId,
userId,
logger: req.log,
onFinishAllFileUploads: (uploadResults) => {
onFinishAllFileUploads: async (uploadResults) => {
res.status(201).send({ uploadResults })
},
onError: () => {
@@ -1,4 +1,4 @@
import {
import type {
DeleteBlob,
DeleteBlobAndAssociatedObject,
GetBlobMetadata,
@@ -7,11 +7,12 @@ import {
UploadFileStream,
UpsertBlob
} from '@/modules/blobstorage/domain/operations'
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
import type { BlobStorageItem } from '@/modules/blobstorage/domain/types'
import { getObjectKey } from '@/modules/blobstorage/helpers/blobs'
import { BadRequestError } from '@/modules/shared/errors'
import { getFileSizeLimitMB } from '@/modules/shared/helpers/envHelper'
import { MaybeAsync } from '@speckle/shared'
import type { MaybeAsync } from '@speckle/shared'
import type { ProcessingResult } from '@/modules/blobstorage/domain/types'
/**
* File size limit in bytes
@@ -90,7 +91,12 @@ const updateBlobMetadataFactory =
})
const updateData = await updateCallback({ objectKey: objectKey! })
await deps.updateBlob({ id: blobId, item: updateData, streamId })
return { blobId, fileName, ...updateData }
return {
blobId,
...updateData,
fileName: updateData.fileName ?? fileName, // ensure the fileName is not undefined
fileSize: updateData.fileSize ?? null // ensure the fileSize is not undefined
}
}
export const markUploadSuccessFactory =
@@ -99,7 +105,7 @@ export const markUploadSuccessFactory =
getObjectAttributes: (params: ObjectKeyPayload) => MaybeAsync<{ fileSize: number }>,
streamId: string,
blobId: string
) => {
): Promise<ProcessingResult> => {
const updateBlobMetadata = updateBlobMetadataFactory(deps)
return await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => {
const { fileSize } = await getObjectAttributes({ objectKey })
@@ -117,7 +123,7 @@ export const markUploadErrorFactory =
streamId: string,
blobId: string,
error: string
) => {
): Promise<ProcessingResult> => {
const updateBlobMetadata = updateBlobMetadataFactory(deps)
return await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => {
await deleteObject({ objectKey })
@@ -127,7 +133,11 @@ export const markUploadErrorFactory =
export const markUploadOverFileSizeLimitFactory =
(deps: UpdateBlobMetadataDeps) =>
async (deleteObject: DeleteObjectFromStorage, streamId: string, blobId: string) => {
async (
deleteObject: DeleteObjectFromStorage,
streamId: string,
blobId: string
): Promise<ProcessingResult> => {
const markUploadError = markUploadErrorFactory(deps)
return await markUploadError(
deleteObject,
@@ -15,28 +15,24 @@ import {
getObjectAttributesFactory,
storeFileStreamFactory
} from '@/modules/blobstorage/repositories/blobs'
import { ensureError } from '@speckle/shared'
import { ensureError, Nullable } from '@speckle/shared'
import { getProjectObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import type { Logger } from '@/observability/logging'
import type { Writable } from 'stream'
import { get } from 'lodash'
import { UploadResult } from '@/modules/blobstorage/domain/types'
import { ProcessingResult } from '@/modules/blobstorage/domain/types'
type NewFileStreamProcessor = (params: {
writeable: Writable
streamId: string
userId: string
onFinishAllFileUploads: (results: UploadResult[]) => void
onFinishAllFileUploads: (results: Array<UploadResult>) => Promise<void>
onError: (err: unknown) => void
logger: Logger
}) => Promise<Writable>
type UploadResult = {
uploadStatus?: number
uploadError?: Error | null | string
formKey: string
}
export const processNewFileStreamFactory = (): NewFileStreamProcessor => {
return async (params) => {
const { writeable, streamId, userId, onFinishAllFileUploads, onError } = params
@@ -46,6 +42,9 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => {
uploadStatus?: number
uploadError?: Error | null | string
formKey: string
blobId: string
fileName: string
fileSize: Nullable<number>
}>[] = []
const [projectDb, projectStorage] = await Promise.all([
@@ -82,12 +81,7 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => {
const { filename: fileName } = info
const fileType = fileName?.split('.')?.pop()?.toLowerCase()
logger = logger.child({ fileName, fileType })
const registerUploadResult = (
processingPromise: Promise<{
uploadStatus?: number
uploadError?: Error | null | string
}>
) => {
const registerUploadResult = (processingPromise: Promise<ProcessingResult>) => {
finalizePromises.push(
processingPromise.then((resultItem) => ({ ...resultItem, formKey }))
)
@@ -141,7 +135,7 @@ export const processNewFileStreamFactory = (): NewFileStreamProcessor => {
await Promise.all(Object.values(uploadOperations))
// have to make sure all finalize promises have been awaited
const uploadResults = await Promise.all(finalizePromises)
onFinishAllFileUploads(uploadResults)
await onFinishAllFileUploads(uploadResults)
return
})
+6 -114
View File
@@ -1,28 +1,18 @@
/* istanbul ignore file */
import { insertNewUploadAndNotifyFactory } from '@/modules/fileuploads/services/management'
import request from 'request'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { moduleLogger } from '@/observability/logging'
import {
onFileImportProcessedFactory,
onFileProcessingFactory,
parseMessagePayload
} from '@/modules/fileuploads/services/resultListener'
import {
getFileInfoFactory,
saveUploadFileFactory
} from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { publish } from '@/modules/shared/utils/subscriptions'
import { getFileInfoFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { FileImportSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getPort, isFileUploadsEnabled } from '@/modules/shared/helpers/envHelper'
import { isFileUploadsEnabled } from '@/modules/shared/helpers/envHelper'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { listenFor } from '@/modules/core/utils/dbNotificationListener'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router'
export const init: SpeckleModule['init'] = async (app, isInitial) => {
if (!isFileUploadsEnabled()) {
@@ -31,99 +21,10 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => {
}
moduleLogger.info('📄 Init FileUploads module')
app.post(
'/api/file/:fileType/:streamId/:branchName?',
async (req, res, next) => {
await authMiddlewareCreator(
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db })
})
)(req, res, next)
},
async (req, res) => {
const branchName = req.params.branchName || 'main'
req.log = req.log.child({
streamId: req.params.streamId,
userId: req.context.userId,
branchName
})
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
saveUploadFile: saveUploadFileFactory({ db: projectDb }),
publish
})
const saveFileUploads = async ({
userId,
streamId,
branchName,
uploadResults
}: {
userId: string
streamId: string
branchName: string
uploadResults: Array<{
blobId: string
fileName: string
fileSize: number
}>
}) => {
await Promise.all(
uploadResults.map(async (upload) => {
await insertNewUploadAndNotify({
fileId: upload.blobId,
streamId,
branchName,
userId,
fileName: upload.fileName,
fileType: upload.fileName.split('.').pop()!,
fileSize: upload.fileSize
})
})
)
}
//TODO refactor packages/server/modules/blobstorage/index.ts to use the service pattern, and then refactor this to call the service directly from here without the http overhead
const pipedReq = request(
// we call this same server on localhost (IPv4) to upload the blob and do not make an external call
`http://127.0.0.1:${getPort()}/api/stream/${req.params.streamId}/blob`,
async (err, response, body) => {
if (err) {
res.log.error(err, 'Error while uploading blob.')
res.status(500).send(err.message)
return
}
if (response.statusCode === 201) {
const { uploadResults } = JSON.parse(body)
await saveFileUploads({
userId: req.context.userId!,
streamId: req.params.streamId,
branchName,
uploadResults
})
} else {
res.log.error(
{
statusCode: response.statusCode,
path: `http://127.0.0.1:${getPort()}/api/stream/${
req.params.streamId
}/blob`
},
'Error while uploading file.'
)
}
res.contentType('application/json')
res.status(response.statusCode).send(body)
}
)
req.pipe(pipedReq as unknown as NodeJS.WritableStream)
}
)
app.use(fileuploadRouterFactory())
if (isInitial) {
listenFor('file_import_update', async (msg) => {
listenFor(FileImportSubscriptions.ProjectFileImportUpdated, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId })
@@ -134,14 +35,5 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => {
eventEmit: getEventBus().emit
})(parsedMessage)
})
listenFor('file_import_started', async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({ projectId: parsedMessage.streamId })
await onFileProcessingFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
publish
})(parsedMessage)
})
}
}
@@ -0,0 +1,111 @@
import { Router } from 'express'
import { insertNewUploadAndNotifyFactory } from '@/modules/fileuploads/services/management'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { saveUploadFileFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { publish } from '@/modules/shared/utils/subscriptions'
import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { createBusboy } from '@/modules/blobstorage/rest/busboy'
import { processNewFileStreamFactory } from '@/modules/blobstorage/services/streams'
import { UnauthorizedError } from '@/modules/shared/errors'
import { Nullable } from '@speckle/shared'
export const fileuploadRouterFactory = (): Router => {
const processNewFileStream = processNewFileStreamFactory()
const app = Router()
app.post(
'/api/file/:fileType/:streamId/:branchName?',
async (req, res, next) => {
await authMiddlewareCreator(
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db })
})
)(req, res, next)
},
async (req, res) => {
const branchName = req.params.branchName || 'main'
const streamId = req.params.streamId
const userId = req.context.userId
if (!userId) {
throw new UnauthorizedError('User not authenticated.')
}
const logger = req.log.child({
streamId,
userId,
branchName
})
const projectDb = await getProjectDbClient({ projectId: streamId })
const insertNewUploadAndNotify = insertNewUploadAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
saveUploadFile: saveUploadFileFactory({ db: projectDb }),
publish
})
const saveFileUploads = async ({
userId,
streamId,
branchName,
uploadResults
}: {
userId: string
streamId: string
branchName: string
uploadResults: Array<{
blobId: string
fileName: string
fileSize: Nullable<number>
}>
}) => {
await Promise.all(
uploadResults.map(async (upload) => {
await insertNewUploadAndNotify({
fileId: upload.blobId,
streamId,
branchName,
userId,
fileName: upload.fileName,
fileType: upload.fileName?.split('.').pop() || '', //FIXME
fileSize: upload.fileSize
})
})
)
}
const busboy = createBusboy(req)
const newFileStreamProcessor = await processNewFileStream({
writeable: busboy,
streamId,
userId,
logger,
onFinishAllFileUploads: async (uploadResults) => {
await saveFileUploads({
userId: req.context.userId!,
streamId: req.params.streamId,
branchName,
uploadResults
})
res.status(201).send({ uploadResults })
},
onError: () => {
res.contentType('application/json')
res
.status(400)
.end(
'{ "error": "Upload request error. The server logs may have more details." }'
)
}
})
req.pipe(newFileStreamProcessor)
}
)
return app
}