From f75b77e899409ea7224d367a01d74726e557fe5b Mon Sep 17 00:00:00 2001 From: Iain Sproat <68657+iainsproat@users.noreply.github.com> Date: Mon, 31 Mar 2025 12:03:43 +0100 Subject: [PATCH] push work on to database, simplify application logic --- .../modules/fileuploads/domain/operations.ts | 24 +++------- packages/server/modules/fileuploads/index.ts | 14 +++--- .../fileuploads/repositories/fileUploads.ts | 47 ++++++++----------- .../fileuploads/services/management.ts | 27 +++++------ .../modules/fileuploads/services/tasks.ts | 39 +++++---------- .../fileuploads/tests/fileuploads.spec.ts | 14 +++--- 6 files changed, 65 insertions(+), 100 deletions(-) diff --git a/packages/server/modules/fileuploads/domain/operations.ts b/packages/server/modules/fileuploads/domain/operations.ts index 0184d783b..3316b7a62 100644 --- a/packages/server/modules/fileuploads/domain/operations.ts +++ b/packages/server/modules/fileuploads/domain/operations.ts @@ -1,7 +1,4 @@ -import { - FileUploadConvertedStatus, - FileUploadRecord -} from '@/modules/fileuploads/helpers/types' +import { FileUploadRecord } from '@/modules/fileuploads/helpers/types' import { SaveUploadFileInput } from '@/modules/fileuploads/repositories/fileUploads' import { Optional } from '@speckle/shared' @@ -11,17 +8,10 @@ export type GetFileInfo = (args: { export type SaveUploadFile = (args: SaveUploadFileInput) => Promise -export type UpdateUploadFile = (args: { - fileId: string - newStatus: FileUploadConvertedStatus -}) => Promise -export type UpdateFileStatusAndNotify = (params: { - streamId: string - branchName: string - fileId: string - newStatus: FileUploadConvertedStatus -}) => Promise +export type GarbageCollectPendingUploadedFiles = (args: { + timeoutThresholdSeconds: number +}) => Promise -export type GetAllPendingUploads = ( - options?: Partial<{ limit: number }> -) => Promise +export type NotifyChangeInFileStatus = (params: { + file: FileUploadRecord +}) => Promise diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index 34be61b2a..1c5e19cc2 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -1,5 +1,5 @@ import cron from 'node-cron' -import { updateUploadAndNotifyFactory } from '@/modules/fileuploads/services/management' +import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' import { moduleLogger } from '@/observability/logging' import { onFileImportProcessedFactory, @@ -7,9 +7,8 @@ import { parseMessagePayload } from '@/modules/fileuploads/services/resultListener' import { - getAllPendingUploadsFactory, - getFileInfoFactory, - updateUploadFileFactory + expireOldPendingUploadsFactory, + getFileInfoFactory } from '@/modules/fileuploads/repositories/fileUploads' import { db } from '@/db/knex' import { publish } from '@/modules/shared/utils/subscriptions' @@ -46,10 +45,11 @@ const scheduleFileImportExpiry = async ({ for (const projectDb of [db, ...regionClients]) { fileImportExpiryHandlers.push( manageFileImportExpiryFactory({ - getPendingUploads: getAllPendingUploadsFactory({ db: projectDb }), - updateUploadStatus: updateUploadAndNotifyFactory({ + garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({ + db: projectDb + }), + notifyUploadStatus: notifyChangeInFileStatus({ getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), - updateUploadFile: updateUploadFileFactory({ db: projectDb }), publish }) }) diff --git a/packages/server/modules/fileuploads/repositories/fileUploads.ts b/packages/server/modules/fileuploads/repositories/fileUploads.ts index 9e25cb333..55b1648a1 100644 --- a/packages/server/modules/fileuploads/repositories/fileUploads.ts +++ b/packages/server/modules/fileuploads/repositories/fileUploads.ts @@ -1,9 +1,8 @@ import { Branches, FileUploads, knex } from '@/modules/core/dbSchema' import { - GetAllPendingUploads, + GarbageCollectPendingUploadedFiles, GetFileInfo, - SaveUploadFile, - UpdateUploadFile + SaveUploadFile } from '@/modules/fileuploads/domain/operations' import { FileUploadConvertedStatus, @@ -80,35 +79,27 @@ export const saveUploadFileFactory = return newRecord as FileUploadRecord } -export const updateUploadFileFactory = - (deps: { db: Knex }): UpdateUploadFile => - async ({ fileId, newStatus }) => { - const [updatedRecord] = await tables - .fileUploads(deps.db) - .where({ [FileUploads.col.id]: fileId }) +export const expireOldPendingUploadsFactory = + (deps: { db: Knex }): GarbageCollectPendingUploadedFiles => + async (params: { timeoutThresholdSeconds: number }) => { + const updatedRows = await deps + .db(FileUploads.name) + .whereIn(FileUploads.withoutTablePrefix.col.convertedStatus, [ + FileUploadConvertedStatus.Converting, + FileUploadConvertedStatus.Queued + ]) + .andWhere( + FileUploads.withoutTablePrefix.col.uploadDate, + '<', + deps.db.raw(`now() - interval '${params.timeoutThresholdSeconds} seconds'`) + ) .update({ - [FileUploads.withoutTablePrefix.col.convertedStatus]: newStatus + [FileUploads.withoutTablePrefix.col.convertedStatus]: + FileUploadConvertedStatus.Error }) .returning('*') - return updatedRecord - } - -export const getAllPendingUploadsFactory = - (deps: { db: Knex }): GetAllPendingUploads => - async (options) => { - const { limit } = options || {} - const q = tables - .fileUploads(deps.db) - .whereIn(FileUploads.col.convertedStatus, [ - FileUploadConvertedStatus.Queued, - FileUploadConvertedStatus.Converting - ]) - .orderBy(FileUploads.col.uploadDate, 'asc') - if (limit) { - q.limit(limit) - } - return await q + return updatedRows } const getPendingUploadsBaseQueryFactory = diff --git a/packages/server/modules/fileuploads/services/management.ts b/packages/server/modules/fileuploads/services/management.ts index 07ca83734..cd4a85562 100644 --- a/packages/server/modules/fileuploads/services/management.ts +++ b/packages/server/modules/fileuploads/services/management.ts @@ -6,8 +6,7 @@ import { } from '@/modules/core/graph/generated/graphql' import { SaveUploadFile, - UpdateFileStatusAndNotify, - UpdateUploadFile + NotifyChangeInFileStatus } from '@/modules/fileuploads/domain/operations' import { SaveUploadFileInput } from '@/modules/fileuploads/repositories/fileUploads' import { @@ -56,43 +55,43 @@ export const insertNewUploadAndNotifyFactory = }) } -export const updateUploadAndNotifyFactory = +export const notifyChangeInFileStatus = (deps: { getStreamBranchByName: GetStreamBranchByName - updateUploadFile: UpdateUploadFile publish: PublishSubscription - }): UpdateFileStatusAndNotify => + }): NotifyChangeInFileStatus => async (params) => { - const branch = await deps.getStreamBranchByName(params.streamId, params.branchName) - const file = await deps.updateUploadFile(params) + const { file } = params + const { id: fileId, streamId, branchName } = file + const branch = await deps.getStreamBranchByName(streamId, branchName) if (!branch) { await deps.publish(FileImportSubscriptions.ProjectPendingModelsUpdated, { projectPendingModelsUpdated: { - id: file.id, + id: fileId, type: ProjectPendingModelsUpdatedMessageType.Updated, model: file }, - projectId: file.streamId + projectId: streamId }) } else { await deps.publish(FileImportSubscriptions.ProjectPendingVersionsUpdated, { projectPendingVersionsUpdated: { - id: file.id, + id: fileId, type: ProjectPendingVersionsUpdatedMessageType.Updated, version: file }, - projectId: file.streamId, - branchName: file.branchName + projectId: streamId, + branchName }) } await deps.publish(FileImportSubscriptions.ProjectFileImportUpdated, { projectFileImportUpdated: { - id: file.id, + id: fileId, type: ProjectFileImportUpdatedMessageType.Created, upload: file }, - projectId: file.streamId + projectId: streamId }) } diff --git a/packages/server/modules/fileuploads/services/tasks.ts b/packages/server/modules/fileuploads/services/tasks.ts index 760a0f2d7..cce7aff9a 100644 --- a/packages/server/modules/fileuploads/services/tasks.ts +++ b/packages/server/modules/fileuploads/services/tasks.ts @@ -1,39 +1,24 @@ import { Logger } from '@/observability/logging' import { - GetAllPendingUploads, - UpdateFileStatusAndNotify + GarbageCollectPendingUploadedFiles, + NotifyChangeInFileStatus } from '@/modules/fileuploads/domain/operations' -import { FileUploadConvertedStatus } from '@/modules/fileuploads/helpers/types' export const manageFileImportExpiryFactory = (deps: { - getPendingUploads: GetAllPendingUploads - updateUploadStatus: UpdateFileStatusAndNotify + garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles + notifyUploadStatus: NotifyChangeInFileStatus }) => { const {} = deps return async (params: { logger: Logger; timeoutThresholdSeconds: number }) => { - const { logger } = params - const now = new Date().getTime() - logger.info('Managing file import expiry') - // Logic to manage file import expiry goes here - // check for expired file imports - // if over some timeout threshold, move them into an error state - // and notify the user - const pendingUploads = await deps.getPendingUploads({ - limit: 100 + const { logger, timeoutThresholdSeconds } = params + const updatedUploads = await deps.garbageCollectExpiredPendingUploads({ + timeoutThresholdSeconds }) - logger.info(`Found ${pendingUploads.length} pending uploads`) - for (const upload of pendingUploads) { - const uploadDate = new Date(upload.uploadDate).getTime() - const diff = now - uploadDate - if (diff > 1000 * params.timeoutThresholdSeconds) { - logger.info(`Marking upload ${upload.id} as error`) - await deps.updateUploadStatus({ - streamId: upload.streamId, - branchName: upload.branchName, - fileId: upload.id, - newStatus: FileUploadConvertedStatus.Error - }) - } + logger.info(`Expired ${updatedUploads.length} pending uploads`) + for (const upload of updatedUploads) { + await deps.notifyUploadStatus({ + file: upload + }) } } } diff --git a/packages/server/modules/fileuploads/tests/fileuploads.spec.ts b/packages/server/modules/fileuploads/tests/fileuploads.spec.ts index 3d4c7b96f..45257347a 100644 --- a/packages/server/modules/fileuploads/tests/fileuploads.spec.ts +++ b/packages/server/modules/fileuploads/tests/fileuploads.spec.ts @@ -45,14 +45,13 @@ import { sendEmail } from '@/modules/emails/services/sending' import { getServerInfoFactory } from '@/modules/core/repositories/server' import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks' import { - getAllPendingUploadsFactory, + expireOldPendingUploadsFactory, getFileInfoFactory, - saveUploadFileFactory, - updateUploadFileFactory + saveUploadFileFactory } from '@/modules/fileuploads/repositories/fileUploads' import { insertNewUploadAndNotifyFactory, - updateUploadAndNotifyFactory + notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' import { publish } from '@/modules/shared/utils/subscriptions' import { testLogger as logger } from '@/observability/logging' @@ -122,10 +121,11 @@ const createUser = createUserFactory({ }) const garbageCollector = manageFileImportExpiryFactory({ - getPendingUploads: getAllPendingUploadsFactory({ db }), - updateUploadStatus: updateUploadAndNotifyFactory({ + garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({ + db + }), + notifyUploadStatus: notifyChangeInFileStatus({ getStreamBranchByName: getStreamBranchByNameFactory({ db }), - updateUploadFile: updateUploadFileFactory({ db }), publish }) })