chore(server): blobstorage IoC 4 - getBlobMetadataFactory (#2960)
* chore(server): blobstorage IoC 4 - getBlobMetadataFactory * tests fix * chore(server): blobstorage IoC 5 - getBlobMetadataCollectionFactory --------- Co-authored-by: Alessandro Magionami <alessandro.magionami@gmail.com>
This commit is contained in:
committed by
GitHub
parent
a5e95159f5
commit
3ccf9c6aea
@@ -2,7 +2,7 @@ import {
|
||||
BlobStorageItem,
|
||||
BlobStorageItemInput
|
||||
} from '@/modules/blobstorage/domain/types'
|
||||
import { MaybeNullOrUndefined } from '@speckle/shared'
|
||||
import { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
|
||||
|
||||
export type GetBlobs = (params: {
|
||||
streamId?: MaybeNullOrUndefined<string>
|
||||
@@ -15,3 +15,15 @@ export type UpdateBlob = (params: {
|
||||
id: string
|
||||
item: Partial<BlobStorageItem>
|
||||
}) => Promise<BlobStorageItem>
|
||||
|
||||
export type GetBlobMetadata = (params: {
|
||||
blobId: string
|
||||
streamId: string
|
||||
}) => Promise<BlobStorageItem>
|
||||
|
||||
export type GetBlobMetadataCollection = (params: {
|
||||
streamId: string
|
||||
query?: Nullable<string>
|
||||
limit?: Nullable<number>
|
||||
cursor?: Nullable<string>
|
||||
}) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable<string> }>
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import { BlobStorageRecord } from '@/modules/blobstorage/helpers/types'
|
||||
import { db } from '@/db/knex'
|
||||
import {
|
||||
getBlobMetadata,
|
||||
getBlobMetadataCollection,
|
||||
blobCollectionSummary,
|
||||
getFileSizeLimit
|
||||
} from '@/modules/blobstorage/services'
|
||||
getBlobMetadataCollectionFactory,
|
||||
getBlobMetadataFactory
|
||||
} from '@/modules/blobstorage/repositories'
|
||||
import { blobCollectionSummary, getFileSizeLimit } from '@/modules/blobstorage/services'
|
||||
import {
|
||||
ProjectBlobArgs,
|
||||
ProjectBlobsArgs,
|
||||
@@ -18,7 +17,9 @@ import {
|
||||
NotFoundError,
|
||||
ResourceMismatch
|
||||
} from '@/modules/shared/errors'
|
||||
import { Nullable } from '@speckle/shared'
|
||||
|
||||
const getBlobMetadata = getBlobMetadataFactory({ db })
|
||||
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({ db })
|
||||
|
||||
const streamBlobResolvers = {
|
||||
async blobs(parent: StreamGraphQLReturn, args: StreamBlobsArgs | ProjectBlobsArgs) {
|
||||
@@ -44,10 +45,10 @@ const streamBlobResolvers = {
|
||||
},
|
||||
async blob(parent: StreamGraphQLReturn, args: StreamBlobArgs | ProjectBlobArgs) {
|
||||
try {
|
||||
return (await getBlobMetadata({
|
||||
return await getBlobMetadata({
|
||||
streamId: parent.id,
|
||||
blobId: args.id
|
||||
})) as Nullable<BlobStorageRecord>
|
||||
})
|
||||
} catch (err: unknown) {
|
||||
if (err instanceof NotFoundError) return null
|
||||
if (err instanceof ResourceMismatch) throw new BadRequestError(err.message)
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
import { BadRequestError } from '@/modules/shared/errors'
|
||||
|
||||
export const cursorFromRows = <Row, Target extends keyof Row>(
|
||||
rows: Array<Row>,
|
||||
cursorTarget: Target
|
||||
) => {
|
||||
if (rows?.length > 0) {
|
||||
const lastRow = rows[rows.length - 1]
|
||||
const cursor = lastRow[cursorTarget]
|
||||
if (!(cursor instanceof Date))
|
||||
throw new BadRequestError('The cursor target is not a date object')
|
||||
|
||||
return Buffer.from(cursor.toISOString()).toString('base64')
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export const decodeCursor = (cursor: string) => {
|
||||
const decoded = Buffer.from(cursor, 'base64').toString()
|
||||
if (isNaN(Date.parse(decoded)))
|
||||
throw new BadRequestError('The cursor is not a base64 encoded date string')
|
||||
|
||||
return decoded
|
||||
}
|
||||
@@ -1,13 +1,3 @@
|
||||
export type BlobStorageRecord = {
|
||||
id: string
|
||||
streamId: string
|
||||
userId: string | null
|
||||
objectKey: string | null
|
||||
fileName: string
|
||||
fileType: string
|
||||
fileSize: number | null
|
||||
uploadStatus: number
|
||||
uploadError: string | null
|
||||
createdAt: Date
|
||||
fileHash: string | null
|
||||
}
|
||||
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
|
||||
|
||||
export type BlobStorageRecord = BlobStorageItem
|
||||
|
||||
@@ -22,8 +22,6 @@ const {
|
||||
markUploadSuccess,
|
||||
markUploadOverFileSizeLimit,
|
||||
deleteBlob,
|
||||
getBlobMetadata,
|
||||
getBlobMetadataCollection,
|
||||
getFileSizeLimit
|
||||
} = require('@/modules/blobstorage/services')
|
||||
|
||||
@@ -38,7 +36,9 @@ const { moduleLogger, logger } = require('@/logging/logging')
|
||||
const {
|
||||
getAllStreamBlobIdsFactory,
|
||||
upsertBlobFactory,
|
||||
updateBlobFactory
|
||||
updateBlobFactory,
|
||||
getBlobMetadataFactory,
|
||||
getBlobMetadataCollectionFactory
|
||||
} = require('@/modules/blobstorage/repositories')
|
||||
const { db } = require('@/db/knex')
|
||||
const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload')
|
||||
@@ -48,6 +48,8 @@ const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
const getBlobMetadata = getBlobMetadataFactory({ db })
|
||||
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({ db })
|
||||
|
||||
const ensureConditions = async () => {
|
||||
if (process.env.DISABLE_FILE_UPLOADS) {
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import {
|
||||
GetBlobMetadata,
|
||||
GetBlobMetadataCollection,
|
||||
GetBlobs,
|
||||
UpdateBlob,
|
||||
UpsertBlob
|
||||
@@ -7,7 +9,13 @@ import {
|
||||
BlobStorageItem,
|
||||
BlobStorageItemInput
|
||||
} from '@/modules/blobstorage/domain/types'
|
||||
import { cursorFromRows, decodeCursor } from '@/modules/blobstorage/helpers/db'
|
||||
import { buildTableHelper } from '@/modules/core/dbSchema'
|
||||
import {
|
||||
BadRequestError,
|
||||
NotFoundError,
|
||||
ResourceMismatch
|
||||
} from '@/modules/shared/errors'
|
||||
import { Knex } from 'knex'
|
||||
|
||||
const BlobStorage = buildTableHelper('blob_storage', [
|
||||
@@ -73,3 +81,44 @@ export const updateBlobFactory =
|
||||
.update(item, '*')
|
||||
return res
|
||||
}
|
||||
|
||||
export const getBlobMetadataFactory =
|
||||
(deps: { db: Knex }): GetBlobMetadata =>
|
||||
async (params: { blobId: string; streamId: string }) => {
|
||||
const { blobId, streamId } = params
|
||||
|
||||
if (!streamId) throw new BadRequestError('No steamId provided')
|
||||
const obj =
|
||||
(await tables
|
||||
.blobStorage(deps.db)
|
||||
.where({ [BlobStorage.col.id]: blobId, [BlobStorage.col.streamId]: streamId })
|
||||
.first()) || null
|
||||
|
||||
if (!obj) throw new NotFoundError(`The requested asset: ${blobId} doesn't exist`)
|
||||
if (obj.streamId !== streamId)
|
||||
throw new ResourceMismatch("The stream doesn't have the given resource")
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
export const getBlobMetadataCollectionFactory =
|
||||
(deps: { db: Knex }): GetBlobMetadataCollection =>
|
||||
async ({ streamId, query = null, limit = 25, cursor = null }) => {
|
||||
const cursorTarget = 'createdAt'
|
||||
const limitMax = 25
|
||||
const queryLimit = limit && limit < limitMax ? limit : limitMax
|
||||
const blobs = tables
|
||||
.blobStorage(deps.db)
|
||||
.where({ [BlobStorage.col.streamId]: streamId })
|
||||
.orderBy(cursorTarget, 'desc')
|
||||
.limit(queryLimit)
|
||||
|
||||
if (query) blobs.andWhereLike('fileName', `%${query}%`)
|
||||
if (cursor) blobs.andWhere(cursorTarget, '<', decodeCursor(cursor))
|
||||
|
||||
const rows = await blobs
|
||||
return {
|
||||
blobs: rows,
|
||||
cursor: cursorFromRows(rows, cursorTarget)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,82 +1,17 @@
|
||||
const knex = require('@/db/knex')
|
||||
const {
|
||||
NotFoundError,
|
||||
ResourceMismatch,
|
||||
BadRequestError
|
||||
} = require('@/modules/shared/errors')
|
||||
const { getBlobMetadataFactory } = require('@/modules/blobstorage/repositories')
|
||||
const { getFileSizeLimitMB } = require('@/modules/shared/helpers/envHelper')
|
||||
const BlobStorage = () => knex('blob_storage')
|
||||
|
||||
const blobLookup = ({ blobId, streamId }) =>
|
||||
BlobStorage().where({ id: blobId, streamId })
|
||||
|
||||
/**
|
||||
* @returns {import('@/modules/blobstorage/helpers/types').BlobStorageRecord | null}
|
||||
*/
|
||||
const getBlobMetadata = async ({ streamId, blobId }, blobRepo = blobLookup) => {
|
||||
if (!streamId) throw new BadRequestError('No steamId provided')
|
||||
const obj = (await blobRepo({ blobId, streamId }).first()) || null
|
||||
if (!obj) throw new NotFoundError(`The requested asset: ${blobId} doesn't exist`)
|
||||
if (obj.streamId !== streamId)
|
||||
throw new ResourceMismatch("The stream doesn't have the given resource")
|
||||
return obj
|
||||
}
|
||||
|
||||
const blobQuery = ({ streamId, query }) => {
|
||||
let blobs = BlobStorage().where({ streamId })
|
||||
if (query) blobs = blobs.andWhereLike('fileName', `%${query}%`)
|
||||
return blobs
|
||||
}
|
||||
|
||||
const cursorFromRows = (rows, cursorTarget) => {
|
||||
if (rows?.length > 0) {
|
||||
const lastRow = rows[rows.length - 1]
|
||||
const cursor = lastRow[cursorTarget]
|
||||
if (!(cursor instanceof Date))
|
||||
throw new BadRequestError('The cursor target is not a date object')
|
||||
return Buffer.from(cursor.toISOString()).toString('base64')
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const decodeCursor = (cursor) => {
|
||||
const decoded = Buffer.from(cursor, 'base64').toString()
|
||||
if (isNaN(Date.parse(decoded)))
|
||||
throw new BadRequestError('The cursor is not a base64 encoded date string')
|
||||
return decoded
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{
|
||||
* streamId: string,
|
||||
* query?: string | null,
|
||||
* limit?: number | null,
|
||||
* cursor?: string | null
|
||||
* }} param0
|
||||
* @returns
|
||||
*/
|
||||
const getBlobMetadataCollection = async ({
|
||||
streamId,
|
||||
query = null,
|
||||
limit = 25,
|
||||
cursor = null
|
||||
}) => {
|
||||
const cursorTarget = 'createdAt'
|
||||
const limitMax = 25
|
||||
const queryLimit = limit && limit < limitMax ? limit : limitMax
|
||||
const blobs = blobQuery({ streamId, query })
|
||||
.orderBy(cursorTarget, 'desc')
|
||||
.limit(queryLimit)
|
||||
if (cursor) blobs.andWhere(cursorTarget, '<', decodeCursor(cursor))
|
||||
|
||||
const rows = await blobs
|
||||
return {
|
||||
blobs: rows,
|
||||
cursor: cursorFromRows(rows, cursorTarget)
|
||||
}
|
||||
}
|
||||
|
||||
const blobCollectionSummary = async ({ streamId, query }) => {
|
||||
const [summary] = await blobQuery({ streamId, query }).sum('fileSize').count('id')
|
||||
return {
|
||||
@@ -86,7 +21,7 @@ const blobCollectionSummary = async ({ streamId, query }) => {
|
||||
}
|
||||
|
||||
const getFileStream = async ({ getObjectStream, streamId, blobId }) => {
|
||||
const { objectKey } = await getBlobMetadata({ streamId, blobId })
|
||||
const { objectKey } = await getBlobMetadataFactory({ db: knex })({ streamId, blobId })
|
||||
return await getObjectStream({ objectKey })
|
||||
}
|
||||
|
||||
@@ -106,13 +41,16 @@ const markUploadError = async (deleteObject, streamId, blobId, error) =>
|
||||
})
|
||||
|
||||
const deleteBlob = async ({ streamId, blobId, deleteObject }) => {
|
||||
const { objectKey } = await getBlobMetadata({ streamId, blobId })
|
||||
const { objectKey } = await getBlobMetadataFactory({ db: knex })({ streamId, blobId })
|
||||
await deleteObject({ objectKey })
|
||||
await blobLookup({ blobId, streamId }).del()
|
||||
}
|
||||
|
||||
const updateBlobMetadata = async (streamId, blobId, updateCallback) => {
|
||||
const { objectKey, fileName } = await getBlobMetadata({ streamId, blobId })
|
||||
const { objectKey, fileName } = await getBlobMetadataFactory({ db: knex })({
|
||||
streamId,
|
||||
blobId
|
||||
})
|
||||
const updateData = await updateCallback({ objectKey })
|
||||
await blobLookup({ blobId, streamId }).update(updateData)
|
||||
return { blobId, fileName, ...updateData }
|
||||
@@ -121,15 +59,11 @@ const updateBlobMetadata = async (streamId, blobId, updateCallback) => {
|
||||
const getFileSizeLimit = () => getFileSizeLimitMB() * 1024 * 1024
|
||||
|
||||
module.exports = {
|
||||
cursorFromRows,
|
||||
decodeCursor,
|
||||
getBlobMetadata,
|
||||
markUploadSuccess,
|
||||
markUploadOverFileSizeLimit,
|
||||
markUploadError,
|
||||
getFileStream,
|
||||
deleteBlob,
|
||||
getBlobMetadataCollection,
|
||||
blobCollectionSummary,
|
||||
getFileSizeLimit
|
||||
}
|
||||
|
||||
@@ -1,35 +1,35 @@
|
||||
const expect = require('chai').expect
|
||||
const { beforeEachContext } = require('@/test/hooks')
|
||||
const {
|
||||
getBlobMetadata,
|
||||
getBlobMetadataCollection,
|
||||
cursorFromRows,
|
||||
decodeCursor,
|
||||
blobCollectionSummary,
|
||||
getFileStream,
|
||||
deleteBlob,
|
||||
markUploadOverFileSizeLimit,
|
||||
markUploadSuccess
|
||||
} = require('@/modules/blobstorage/services')
|
||||
const {
|
||||
NotFoundError,
|
||||
ResourceMismatch,
|
||||
BadRequestError
|
||||
} = require('@/modules/shared/errors')
|
||||
const { NotFoundError, BadRequestError } = require('@/modules/shared/errors')
|
||||
const { range } = require('lodash')
|
||||
const { fakeIdGenerator, createBlobs } = require('@/modules/blobstorage/tests/helpers')
|
||||
const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload')
|
||||
const {
|
||||
upsertBlobFactory,
|
||||
updateBlobFactory
|
||||
updateBlobFactory,
|
||||
getBlobMetadataFactory,
|
||||
getBlobMetadataCollectionFactory
|
||||
} = require('@/modules/blobstorage/repositories')
|
||||
const { db } = require('@/db/knex')
|
||||
|
||||
const { cursorFromRows, decodeCursor } = require('@/modules/blobstorage/helpers/db')
|
||||
const { createTestStream } = require('@/test/speckle-helpers/streamHelper')
|
||||
const cryptoRandomString = require('crypto-random-string')
|
||||
const { createTestUser } = require('@/test/authHelper')
|
||||
const fakeFileStreamStore = (fakeHash) => async () => ({ fileHash: fakeHash })
|
||||
const upsertBlob = upsertBlobFactory({ db })
|
||||
const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
upsertBlob,
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
const getBlobMetadata = getBlobMetadataFactory({ db })
|
||||
const getBlobMetadataCollection = getBlobMetadataCollectionFactory({ db })
|
||||
|
||||
describe('Blob storage @blobstorage', () => {
|
||||
before(async () => {
|
||||
@@ -78,6 +78,38 @@ describe('Blob storage @blobstorage', () => {
|
||||
})
|
||||
|
||||
describe('Get blob metadata', () => {
|
||||
const testUser1 = {
|
||||
name: 'Blob Test User #1',
|
||||
email: 'testUser1@gmailll.com',
|
||||
id: ''
|
||||
}
|
||||
|
||||
const testStream1 = {
|
||||
name: 'Blob Test Stream #1',
|
||||
isPublic: false,
|
||||
ownerId: '',
|
||||
id: ''
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {import('@/modules/blobstorage/domain/types').BlobStorageItem}
|
||||
*/
|
||||
let testStreamBlob1
|
||||
|
||||
before(async () => {
|
||||
// Insert blob
|
||||
await createTestUser(testUser1)
|
||||
await createTestStream(testStream1, testUser1)
|
||||
testStreamBlob1 = await upsertBlob({
|
||||
id: cryptoRandomString({ length: 10 }),
|
||||
streamId: testStream1.id,
|
||||
userId: testUser1.id,
|
||||
objectKey: 'testObjectKey',
|
||||
fileName: 'testFileName',
|
||||
fileType: 'png'
|
||||
})
|
||||
})
|
||||
|
||||
it('when no blob found throws NotFoundError', async () => {
|
||||
try {
|
||||
await getBlobMetadata({ streamId: 'foo', blobId: 'bar' })
|
||||
@@ -88,33 +120,21 @@ describe('Blob storage @blobstorage', () => {
|
||||
})
|
||||
it('when no streamId throws ResourceMismatch', async () => {
|
||||
try {
|
||||
const fakeBlobLookup = () => ({ first: async () => ({ a: 'random blob' }) })
|
||||
await getBlobMetadata({ streamId: null, blobId: 'bar' }, fakeBlobLookup)
|
||||
await getBlobMetadata({ streamId: null, blobId: 'bar' })
|
||||
throw new Error('This should have failed')
|
||||
} catch (err) {
|
||||
if (!(err instanceof BadRequestError)) throw err
|
||||
}
|
||||
})
|
||||
it('when streamIds are not matching throws ResourceMismatch', async () => {
|
||||
try {
|
||||
const fakeBlobLookup = () => ({
|
||||
first: async () => ({ streamId: 'def not THAT one' })
|
||||
})
|
||||
await getBlobMetadata({ streamId: 'this one', blobId: 'bar' }, fakeBlobLookup)
|
||||
throw new Error('This should have failed')
|
||||
} catch (err) {
|
||||
if (!(err instanceof ResourceMismatch)) throw err
|
||||
}
|
||||
})
|
||||
|
||||
it('for valid input return the data', async () => {
|
||||
const streamId = 'the one im looking for'
|
||||
const blobId = 'my dear blobbie'
|
||||
const fakeBlobMetadata = { streamId, blobId }
|
||||
const fakeBlobLookup = () => ({
|
||||
first: async () => fakeBlobMetadata
|
||||
const blobMetadata = await getBlobMetadata({
|
||||
streamId: testStream1.id,
|
||||
blobId: testStreamBlob1.id
|
||||
})
|
||||
const blobMetadata = await getBlobMetadata({ streamId, blobId }, fakeBlobLookup)
|
||||
expect(blobMetadata).to.deep.equal(fakeBlobMetadata)
|
||||
expect(blobMetadata).to.be.ok
|
||||
expect(blobMetadata.streamId).to.eq(testStream1.id)
|
||||
expect(blobMetadata.id).to.eq(testStreamBlob1.id)
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user