chore(server): blobstorage IoC 3 - uploadFileStreamFactory
This commit is contained in:
@@ -0,0 +1,17 @@
|
||||
import {
|
||||
BlobStorageItem,
|
||||
BlobStorageItemInput
|
||||
} from '@/modules/blobstorage/domain/types'
|
||||
import { MaybeNullOrUndefined } from '@speckle/shared'
|
||||
|
||||
export type GetBlobs = (params: {
|
||||
streamId?: MaybeNullOrUndefined<string>
|
||||
blobIds: string[]
|
||||
}) => Promise<BlobStorageItem[]>
|
||||
|
||||
export type UpsertBlob = (item: BlobStorageItemInput) => Promise<BlobStorageItem>
|
||||
|
||||
export type UpdateBlob = (params: {
|
||||
id: string
|
||||
item: Partial<BlobStorageItem>
|
||||
}) => Promise<BlobStorageItem>
|
||||
@@ -1,7 +0,0 @@
|
||||
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
|
||||
import { MaybeNullOrUndefined } from '@speckle/shared'
|
||||
|
||||
export type GetBlobs = (params: {
|
||||
streamId?: MaybeNullOrUndefined<string>
|
||||
blobIds: string[]
|
||||
}) => Promise<BlobStorageItem[]>
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Nullable } from '@speckle/shared'
|
||||
import { SetOptional } from 'type-fest'
|
||||
|
||||
export type BlobStorageItem = {
|
||||
id: string
|
||||
@@ -13,3 +14,8 @@ export type BlobStorageItem = {
|
||||
createdAt: Date
|
||||
fileHash: Nullable<string>
|
||||
}
|
||||
|
||||
export type BlobStorageItemInput = SetOptional<
|
||||
BlobStorageItem,
|
||||
'fileSize' | 'uploadStatus' | 'uploadError' | 'createdAt' | 'fileHash'
|
||||
>
|
||||
|
||||
@@ -17,7 +17,6 @@ const crs = require('crypto-random-string')
|
||||
const { authMiddlewareCreator } = require('@/modules/shared/middleware')
|
||||
|
||||
const {
|
||||
uploadFileStream,
|
||||
getFileStream,
|
||||
markUploadError,
|
||||
markUploadSuccess,
|
||||
@@ -36,10 +35,19 @@ const {
|
||||
BadRequestError
|
||||
} = require('@/modules/shared/errors')
|
||||
const { moduleLogger, logger } = require('@/logging/logging')
|
||||
const { getAllStreamBlobIdsFactory } = require('@/modules/blobstorage/repositories')
|
||||
const {
|
||||
getAllStreamBlobIdsFactory,
|
||||
upsertBlobFactory,
|
||||
updateBlobFactory
|
||||
} = require('@/modules/blobstorage/repositories')
|
||||
const { db } = require('@/db/knex')
|
||||
const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload')
|
||||
|
||||
const getAllStreamBlobIds = getAllStreamBlobIdsFactory({ db })
|
||||
const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
|
||||
const ensureConditions = async () => {
|
||||
if (process.env.DISABLE_FILE_UPLOADS) {
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
import { GetBlobs } from '@/modules/blobstorage/domain/operationts'
|
||||
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
|
||||
import {
|
||||
GetBlobs,
|
||||
UpdateBlob,
|
||||
UpsertBlob
|
||||
} from '@/modules/blobstorage/domain/operations'
|
||||
import {
|
||||
BlobStorageItem,
|
||||
BlobStorageItemInput
|
||||
} from '@/modules/blobstorage/domain/types'
|
||||
import { buildTableHelper } from '@/modules/core/dbSchema'
|
||||
import { Knex } from 'knex'
|
||||
|
||||
@@ -43,3 +50,26 @@ export const getAllStreamBlobIdsFactory =
|
||||
const res = await tables.blobStorage(deps.db).where({ streamId }).select('id')
|
||||
return res
|
||||
}
|
||||
|
||||
export const upsertBlobFactory =
|
||||
(deps: { db: Knex }): UpsertBlob =>
|
||||
async (item: BlobStorageItemInput) => {
|
||||
const [res] = await tables
|
||||
.blobStorage(deps.db)
|
||||
.insert(item)
|
||||
.onConflict(['id', 'streamId'])
|
||||
.ignore()
|
||||
.returning('*')
|
||||
return res
|
||||
}
|
||||
|
||||
export const updateBlobFactory =
|
||||
(deps: { db: Knex }): UpdateBlob =>
|
||||
async (params: { id: string; item: Partial<BlobStorageItem> }) => {
|
||||
const { id, item } = params
|
||||
const [res] = await tables
|
||||
.blobStorage(deps.db)
|
||||
.where(BlobStorage.col.id, id)
|
||||
.update(item, '*')
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -10,34 +10,6 @@ const BlobStorage = () => knex('blob_storage')
|
||||
const blobLookup = ({ blobId, streamId }) =>
|
||||
BlobStorage().where({ id: blobId, streamId })
|
||||
|
||||
const uploadFileStream = async (
|
||||
storeFileStream,
|
||||
{ streamId, userId },
|
||||
{ blobId, fileName, fileType, fileStream }
|
||||
) => {
|
||||
if (streamId.length !== 10)
|
||||
throw new BadRequestError('The stream id has to be of length 10')
|
||||
if (userId.length !== 10)
|
||||
throw new BadRequestError('The user id has to be of length 10')
|
||||
const objectKey = `assets/${streamId}/${blobId}`
|
||||
const dbFile = {
|
||||
id: blobId,
|
||||
streamId,
|
||||
userId,
|
||||
objectKey,
|
||||
fileName,
|
||||
fileType
|
||||
}
|
||||
// need to insert the upload data before starting otherwise the upload finished
|
||||
// even might fire faster, than the db insert, causing missing asset data in the db
|
||||
await BlobStorage().insert(dbFile).onConflict(['id', 'streamId']).ignore()
|
||||
|
||||
const { fileHash } = await storeFileStream({ objectKey, fileStream })
|
||||
// here we should also update the blob db record with the fileHash
|
||||
await BlobStorage().where({ id: blobId }).update({ fileHash })
|
||||
return { blobId, fileName, fileHash }
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {import('@/modules/blobstorage/helpers/types').BlobStorageRecord | null}
|
||||
*/
|
||||
@@ -152,7 +124,6 @@ module.exports = {
|
||||
cursorFromRows,
|
||||
decodeCursor,
|
||||
getBlobMetadata,
|
||||
uploadFileStream,
|
||||
markUploadSuccess,
|
||||
markUploadOverFileSizeLimit,
|
||||
markUploadError,
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
import { UpdateBlob, UpsertBlob } from '@/modules/blobstorage/domain/operations'
|
||||
import { BadRequestError } from '@/modules/shared/errors'
|
||||
|
||||
export const uploadFileStreamFactory =
|
||||
(deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }) =>
|
||||
async (
|
||||
storeFileStream: (params: {
|
||||
objectKey: string
|
||||
fileStream: Buffer
|
||||
}) => Promise<{ fileHash: string }>,
|
||||
params1: { streamId: string; userId: string },
|
||||
params2: { blobId: string; fileName: string; fileType: string; fileStream: Buffer }
|
||||
) => {
|
||||
const { streamId, userId } = params1
|
||||
const { blobId, fileName, fileType, fileStream } = params2
|
||||
|
||||
if (streamId.length !== 10)
|
||||
throw new BadRequestError('The stream id has to be of length 10')
|
||||
if (userId.length !== 10)
|
||||
throw new BadRequestError('The user id has to be of length 10')
|
||||
|
||||
const objectKey = `assets/${streamId}/${blobId}`
|
||||
const dbFile = {
|
||||
id: blobId,
|
||||
streamId,
|
||||
userId,
|
||||
objectKey,
|
||||
fileName,
|
||||
fileType
|
||||
}
|
||||
// need to insert the upload data before starting otherwise the upload finished
|
||||
// even might fire faster, than the db insert, causing missing asset data in the db
|
||||
await deps.upsertBlob(dbFile)
|
||||
|
||||
const { fileHash } = await storeFileStream({ objectKey, fileStream })
|
||||
|
||||
// here we should also update the blob db record with the fileHash
|
||||
await deps.updateBlob({ id: blobId, item: { fileHash } })
|
||||
|
||||
return { blobId, fileName, fileHash }
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
const expect = require('chai').expect
|
||||
const { beforeEachContext } = require('@/test/hooks')
|
||||
const {
|
||||
uploadFileStream,
|
||||
getBlobMetadata,
|
||||
getBlobMetadataCollection,
|
||||
cursorFromRows,
|
||||
@@ -19,8 +18,18 @@ const {
|
||||
} = require('@/modules/shared/errors')
|
||||
const { range } = require('lodash')
|
||||
const { fakeIdGenerator, createBlobs } = require('@/modules/blobstorage/tests/helpers')
|
||||
const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload')
|
||||
const {
|
||||
upsertBlobFactory,
|
||||
updateBlobFactory
|
||||
} = require('@/modules/blobstorage/repositories')
|
||||
const { db } = require('@/db/knex')
|
||||
|
||||
const fakeFileStreamStore = (fakeHash) => async () => ({ fileHash: fakeHash })
|
||||
const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
|
||||
describe('Blob storage @blobstorage', () => {
|
||||
before(async () => {
|
||||
|
||||
@@ -5,7 +5,17 @@ import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
|
||||
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
|
||||
import { Merge } from 'type-fest'
|
||||
import { storeFileStream } from '@/modules/blobstorage/objectStorage'
|
||||
import { uploadFileStream } from '@/modules/blobstorage/services'
|
||||
import { uploadFileStreamFactory } from '@/modules/blobstorage/services/upload'
|
||||
import {
|
||||
updateBlobFactory,
|
||||
upsertBlobFactory
|
||||
} from '@/modules/blobstorage/repositories'
|
||||
import { db } from '@/db/knex'
|
||||
|
||||
const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
|
||||
export async function createGendoAIRenderRequest(
|
||||
input: GendoAiRenderInput & {
|
||||
|
||||
Reference in New Issue
Block a user