190 lines
5.7 KiB
JavaScript
190 lines
5.7 KiB
JavaScript
const knex = require('@/db/knex')
|
|
const {
|
|
NotFoundError,
|
|
ResourceMismatch,
|
|
BadRequestError
|
|
} = require('@/modules/shared/errors')
|
|
const { getFileSizeLimitMB } = require('@/modules/shared/helpers/envHelper')
|
|
const BlobStorage = () => knex('blob_storage')
|
|
|
|
const blobLookup = ({ blobId, streamId }) =>
|
|
BlobStorage().where({ id: blobId, streamId })
|
|
|
|
/**
|
|
* Get blobs - use only internally, as this doesn't require a streamId
|
|
*/
|
|
const getBlobs = async ({ streamId, blobIds }) => {
|
|
const q = BlobStorage().whereIn('id', blobIds)
|
|
if (streamId) {
|
|
q.andWhere('streamId', streamId)
|
|
}
|
|
|
|
return await q
|
|
}
|
|
|
|
const getAllStreamBlobIds = async ({ streamId }) => {
|
|
const res = await BlobStorage().where({ streamId }).select('id')
|
|
return res
|
|
}
|
|
|
|
/**
|
|
* Get a single blob - use only internally, as this doesn't require a streamId
|
|
*/
|
|
const getBlob = async ({ streamId, blobId }) => {
|
|
const blobs = await getBlobs({ streamId, blobIds: [blobId] })
|
|
return blobs?.length ? blobs[0] : null
|
|
}
|
|
|
|
const uploadFileStream = async (
|
|
storeFileStream,
|
|
{ streamId, userId },
|
|
{ blobId, fileName, fileType, fileStream }
|
|
) => {
|
|
if (streamId.length !== 10)
|
|
throw new BadRequestError('The stream id has to be of length 10')
|
|
if (userId.length !== 10)
|
|
throw new BadRequestError('The user id has to be of length 10')
|
|
const objectKey = `assets/${streamId}/${blobId}`
|
|
const dbFile = {
|
|
id: blobId,
|
|
streamId,
|
|
userId,
|
|
objectKey,
|
|
fileName,
|
|
fileType
|
|
}
|
|
// need to insert the upload data before starting otherwise the upload finished
|
|
// even might fire faster, than the db insert, causing missing asset data in the db
|
|
await BlobStorage().insert(dbFile).onConflict(['id', 'streamId']).ignore()
|
|
|
|
const { fileHash } = await storeFileStream({ objectKey, fileStream })
|
|
// here we should also update the blob db record with the fileHash
|
|
await BlobStorage().where({ id: blobId }).update({ fileHash })
|
|
return { blobId, fileName, fileHash }
|
|
}
|
|
|
|
const getBlobMetadata = async ({ streamId, blobId }, blobRepo = blobLookup) => {
|
|
if (!streamId) throw new BadRequestError('No steamId provided')
|
|
const obj = (await blobRepo({ blobId, streamId }).first()) || null
|
|
if (!obj) throw new NotFoundError(`The requested asset: ${blobId} doesn't exist`)
|
|
if (obj.streamId !== streamId)
|
|
throw new ResourceMismatch("The stream doesn't have the given resource")
|
|
return obj
|
|
}
|
|
|
|
const blobQuery = ({ streamId, query }) => {
|
|
let blobs = BlobStorage().where({ streamId })
|
|
if (query) blobs = blobs.andWhereLike('fileName', `%${query}%`)
|
|
return blobs
|
|
}
|
|
|
|
const cursorFromRows = (rows, cursorTarget) => {
|
|
if (rows?.length > 0) {
|
|
const lastRow = rows[rows.length - 1]
|
|
const cursor = lastRow[cursorTarget]
|
|
if (!(cursor instanceof Date))
|
|
throw new BadRequestError('The cursor target is not a date object')
|
|
return Buffer.from(cursor.toISOString()).toString('base64')
|
|
} else {
|
|
return null
|
|
}
|
|
}
|
|
|
|
const decodeCursor = (cursor) => {
|
|
const decoded = Buffer.from(cursor, 'base64').toString()
|
|
if (isNaN(Date.parse(decoded)))
|
|
throw new BadRequestError('The cursor is not a base64 encoded date string')
|
|
return decoded
|
|
}
|
|
|
|
/**
|
|
* @param {{
|
|
* streamId: string,
|
|
* query?: string | null,
|
|
* limit?: number | null,
|
|
* cursor?: string | null
|
|
* }} param0
|
|
* @returns
|
|
*/
|
|
const getBlobMetadataCollection = async ({
|
|
streamId,
|
|
query = null,
|
|
limit = 25,
|
|
cursor = null
|
|
}) => {
|
|
const cursorTarget = 'createdAt'
|
|
const limitMax = 25
|
|
const queryLimit = limit && limit < limitMax ? limit : limitMax
|
|
const blobs = blobQuery({ streamId, query })
|
|
.orderBy(cursorTarget, 'desc')
|
|
.limit(queryLimit)
|
|
if (cursor) blobs.andWhere(cursorTarget, '<', decodeCursor(cursor))
|
|
|
|
const rows = await blobs
|
|
return {
|
|
blobs: rows,
|
|
cursor: cursorFromRows(rows, cursorTarget)
|
|
}
|
|
}
|
|
|
|
const blobCollectionSummary = async ({ streamId, query }) => {
|
|
const [summary] = await blobQuery({ streamId, query }).sum('fileSize').count('id')
|
|
return {
|
|
totalSize: summary.sum ? parseInt(summary.sum) : 0,
|
|
totalCount: parseInt(summary.count)
|
|
}
|
|
}
|
|
|
|
const getFileStream = async ({ getObjectStream, streamId, blobId }) => {
|
|
const { objectKey } = await getBlobMetadata({ streamId, blobId })
|
|
return await getObjectStream({ objectKey })
|
|
}
|
|
|
|
const markUploadSuccess = async (getObjectAttributes, streamId, blobId) =>
|
|
await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => {
|
|
const { fileSize } = await getObjectAttributes({ objectKey })
|
|
return { uploadStatus: 1, fileSize }
|
|
})
|
|
|
|
const markUploadOverFileSizeLimit = async (deleteObject, streamId, blobId) =>
|
|
await markUploadError(deleteObject, streamId, blobId, 'File size limit reached')
|
|
|
|
const markUploadError = async (deleteObject, streamId, blobId, error) =>
|
|
await updateBlobMetadata(streamId, blobId, async ({ objectKey }) => {
|
|
await deleteObject({ objectKey })
|
|
return { uploadStatus: 2, uploadError: error }
|
|
})
|
|
|
|
const deleteBlob = async ({ streamId, blobId, deleteObject }) => {
|
|
const { objectKey } = await getBlobMetadata({ streamId, blobId })
|
|
await deleteObject({ objectKey })
|
|
await blobLookup({ blobId, streamId }).del()
|
|
}
|
|
|
|
const updateBlobMetadata = async (streamId, blobId, updateCallback) => {
|
|
const { objectKey, fileName } = await getBlobMetadata({ streamId, blobId })
|
|
const updateData = await updateCallback({ objectKey })
|
|
await blobLookup({ blobId, streamId }).update(updateData)
|
|
return { blobId, fileName, ...updateData }
|
|
}
|
|
|
|
const getFileSizeLimit = () => getFileSizeLimitMB() * 1024 * 1024
|
|
|
|
module.exports = {
|
|
cursorFromRows,
|
|
decodeCursor,
|
|
getBlobMetadata,
|
|
uploadFileStream,
|
|
markUploadSuccess,
|
|
markUploadOverFileSizeLimit,
|
|
markUploadError,
|
|
getFileStream,
|
|
deleteBlob,
|
|
getBlobMetadataCollection,
|
|
blobCollectionSummary,
|
|
getBlobs,
|
|
getBlob,
|
|
getAllStreamBlobIds,
|
|
getFileSizeLimit
|
|
}
|