Files
speckle-server/packages/server/modules/blobstorage/services/streams.ts
T
Kristaps Fabians Geikins bde148f286 chore(server): migrating fully to ESM (#5042)
* wip

* some extra fixes

* stuff kinda works?

* need to figure out mocks

* need to figure out mocks

* fix db listener

* gqlgen fix

* minor gqlgen watch adjustment

* lint fixes

* delete old codegen file

* converting migrations to ESM

* getModuleDIrectory

* vitest sort of works

* added back ts-vitest

* resolve gql double load

* fixing test timeout configs

* TSC lint fix

* fix automate tests

* moar debugging

* debugging

* more debugging

* codegen update

* server works

* yargs migrated

* chore(server): getting rid of global mocks for Server ESM (#5046)

* got rid of email mock

* got rid of comment mocks

* got rid of multi region mocks

* got rid of stripe mock

* admin override mock updated

* removed final mock

* fixing import.meta.resolve calls

* another import.meta.resolve fix

* added requested test

* nyc ESM fix

* removed unneeded deps + linting

* yarn lock forgot to commit

* tryna fix flakyness

* email capture util fix

* sendEmail fix

* fix TSX check

* sender transporter fix + CR comments

* merge main fix

* test fixx

* circleci fix

* gqlgen bigint fix

* error formatter fix

* more error formatting improvements

* esmloader added to Dockerfile

* more dockerfile fixes

* bg jobs fix
2025-07-14 10:26:19 +03:00

154 lines
5.1 KiB
TypeScript

import crs from 'crypto-random-string'
import {
upsertBlobFactory,
updateBlobFactory,
getBlobMetadataFactory
} from '@/modules/blobstorage/repositories'
import {
uploadFileStreamFactory,
markUploadSuccessFactory,
markUploadErrorFactory,
markUploadOverFileSizeLimitFactory
} from '@/modules/blobstorage/services/management'
import {
deleteObjectFactory,
getObjectAttributesFactory,
storeFileStreamFactory
} from '@/modules/blobstorage/repositories/blobs'
import { ensureError } from '@speckle/shared'
import { getProjectObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import type { Logger } from '@/observability/logging'
import type { Readable, Writable } from 'stream'
import { get } from 'lodash-es'
import type { UploadResult, ProcessingResult } from '@/modules/blobstorage/domain/types'
import type { Busboy } from 'busboy'
type NewFileStreamProcessor = (params: {
busboy: Busboy
streamId: string
userId: string
onFinishAllFileUploads: (results: Array<UploadResult>) => Promise<void>
onError: (err: unknown) => void
logger: Logger
}) => Promise<Writable>
export const processNewFileStreamFactory = (): NewFileStreamProcessor => {
return async (params) => {
const { busboy, streamId, userId, onFinishAllFileUploads, onError } = params
let { logger } = params
const uploadOperations: Record<string, unknown> = {}
const finalizePromises: Promise<UploadResult>[] = []
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])
const storeFileStream = storeFileStreamFactory({ storage: projectStorage.private })
const updateBlob = updateBlobFactory({ db: projectDb })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const uploadFileStream = uploadFileStreamFactory({
storeFileStream,
upsertBlob: upsertBlobFactory({ db: projectDb }),
updateBlob
})
const markUploadSuccess = markUploadSuccessFactory({
getBlobMetadata,
updateBlob
})
const markUploadError = markUploadErrorFactory({ getBlobMetadata, updateBlob })
const markUploadOverFileSizeLimit = markUploadOverFileSizeLimitFactory({
getBlobMetadata,
updateBlob
})
const getObjectAttributes = getObjectAttributesFactory({
storage: projectStorage.private
})
const deleteObject = deleteObjectFactory({ storage: projectStorage.private })
busboy.on(
'file',
(
formKey: string,
file: Readable & { truncated?: boolean },
info: { filename: string; encoding: string; mimeType: string }
) => {
const { filename: fileName } = info
const fileType = fileName?.split('.')?.pop()?.toLowerCase().trim()
logger = logger.child({ fileName, fileType })
const registerUploadResult = (processingPromise: Promise<ProcessingResult>) => {
finalizePromises.push(
processingPromise.then((resultItem) => ({ ...resultItem, formKey }))
)
}
const blobId = crs({ length: 10 })
logger = logger.child({ blobId })
uploadOperations[blobId] = uploadFileStream(
{ streamId, userId },
{ blobId, fileName, fileType, fileStream: file }
)
//this file level 'close' is fired when a single file upload finishes
//this way individual upload statuses can be updated, when done
file.on('close', async () => {
//this is handled by the file.on('limit', ...) event
if (file.truncated) return
await uploadOperations[blobId]
registerUploadResult(markUploadSuccess(getObjectAttributes, streamId, blobId))
})
file.on('limit', async () => {
await uploadOperations[blobId]
registerUploadResult(
markUploadOverFileSizeLimit(deleteObject, streamId, blobId)
)
})
file.on('error', (err: unknown) => {
registerUploadResult(
markUploadError(deleteObject, streamId, blobId, get(err, 'message'))
)
})
}
)
busboy.on('finish', async () => {
// make sure all upload operations have been awaited,
// otherwise the finish even can fire before all async operations finish
//resulting in missing return values
await Promise.all(Object.values(uploadOperations))
// have to make sure all finalize promises have been awaited
const uploadResults = await Promise.all(finalizePromises)
await onFinishAllFileUploads(uploadResults)
return
})
busboy.on('error', async (err) => {
logger.info({ err }, 'Upload request error.')
//delete all started uploads
await Promise.all(
Object.keys(uploadOperations).map((blobId) =>
markUploadError(
deleteObject,
streamId,
blobId,
ensureError(err, 'Unknown error while uploading blob').message
)
)
)
onError(err)
return
})
return busboy
}
}