diff --git a/packages/ifc-import-service/src/ifc_importer/job_processor.py b/packages/ifc-import-service/src/ifc_importer/job_processor.py index 36484e40c..783fdcce1 100644 --- a/packages/ifc-import-service/src/ifc_importer/job_processor.py +++ b/packages/ifc-import-service/src/ifc_importer/job_processor.py @@ -16,7 +16,7 @@ from specklepy.core.api.inputs.file_import_inputs import ( from specklepy.core.api.models import Version from ifc_importer.domain import FileimportPayload, JobStatus -from ifc_importer.repository import get_next_job, set_job_status, setup_connection +from ifc_importer.repository import get_next_job, return_job_to_queued, setup_connection IDLE_TIMEOUT = 1 @@ -125,7 +125,8 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): ), ) ) - + # the server is responsible for moving successful jobs to the succeeded state + # mark it as succeeded so we do not enter any error handling routines on finalisation job_status = JobStatus.SUCCEEDED except TimeoutError as te: @@ -142,7 +143,9 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): ex = e job_status = JobStatus.FAILED finally: - if job_status == JobStatus.FAILED: + if job_status == JobStatus.QUEUED: + await return_job_to_queued(connection, job_id) + elif job_status == JobStatus.FAILED: # we should be reporting the failure to the server logger.error("job processing failed", exc_info=ex) try: @@ -162,11 +165,13 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): ), ) ) - # if the reporting of the failure does not succeed, we're requeueing - # unless we've reached the max attempts + # the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further except Exception as ex: - if attempt >= job.max_attempt: - job_status = JobStatus.FAILED - else: - job_status = JobStatus.QUEUED - await set_job_status(connection, job_id, job_status) + logger.error("failed to report job failure", exc_info=ex) + await return_job_to_queued(connection, job_id) + # The client will not pick up a queued job if it now has exceeded max attempts. + # The server is responsible for moving queued jobs which have exceeded maximum attempts to failed status. + elif job_status == JobStatus.SUCCEEDED: + # do nothing + # we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server) + continue diff --git a/packages/ifc-import-service/src/ifc_importer/repository.py b/packages/ifc-import-service/src/ifc_importer/repository.py index ab648c57f..0c0b1b66e 100644 --- a/packages/ifc-import-service/src/ifc_importer/repository.py +++ b/packages/ifc-import-service/src/ifc_importer/repository.py @@ -33,6 +33,7 @@ async def get_next_job(connection: Connection) -> FileimportJob | None: WHERE ( --queued job payload ->> 'fileType' = 'ifc' AND status = $2 + AND "attempt" < "maxAttempt" ) OR ( --timed job left on processing state payload ->> 'fileType' = 'ifc' @@ -55,6 +56,11 @@ async def get_next_job(connection: Connection) -> FileimportJob | None: return FileimportJob.model_validate(dict(job)) +async def return_job_to_queued(connection: Connection, job_id: str) -> None: + print(f"returning job: {job_id} to queued") + return await set_job_status(connection, job_id, JobStatus.QUEUED) + + async def set_job_status( connection: Connection, job_id: str, job_status: JobStatus ) -> None: diff --git a/packages/server/modules/backgroundjobs/domain.ts b/packages/server/modules/backgroundjobs/domain.ts index 5e60fcc82..d75eb47a6 100644 --- a/packages/server/modules/backgroundjobs/domain.ts +++ b/packages/server/modules/backgroundjobs/domain.ts @@ -1,9 +1,5 @@ import { z } from 'zod' -export const BackgroundJobType = { - FileImport: 'fileImport' -} as const - export const BackgroundJobStatus = { Queued: 'queued', Processing: 'processing', // this status does not exist in db @@ -14,11 +10,8 @@ export const BackgroundJobStatus = { export type BackgroundJobStatus = (typeof BackgroundJobStatus)[keyof typeof BackgroundJobStatus] -export type BackgroundJobType = - (typeof BackgroundJobType)[keyof typeof BackgroundJobType] - export const BackgroundJobPayload = z.object({ - jobType: z.nativeEnum(BackgroundJobType), + jobType: z.string(), payloadVersion: z.number() }) @@ -46,6 +39,16 @@ export type StoreBackgroundJob = (args: { export type GetBackgroundJob = (args: { jobId: string }) => Promise | null> +export type FailQueuedBackgroundJobsWhichExceedMaximumAttempts< + T extends BackgroundJobPayload = BackgroundJobPayload +> = (args: { originServerUrl: string; jobType: string }) => Promise[]> + +export type UpdateBackgroundJob = + (args: { + jobId: string + status: BackgroundJobStatus + }) => Promise | null> + export type GetBackgroundJobCount< T extends BackgroundJobPayload = BackgroundJobPayload > = (args: { diff --git a/packages/server/modules/backgroundjobs/repositories.ts b/packages/server/modules/backgroundjobs/repositories.ts index f3db48f95..5fd75fd4b 100644 --- a/packages/server/modules/backgroundjobs/repositories.ts +++ b/packages/server/modules/backgroundjobs/repositories.ts @@ -1,10 +1,15 @@ import type { Knex } from 'knex' +import type { + FailQueuedBackgroundJobsWhichExceedMaximumAttempts, + UpdateBackgroundJob +} from '@/modules/backgroundjobs/domain' import { type BackgroundJob, type BackgroundJobPayload, type GetBackgroundJob, type GetBackgroundJobCount, - type StoreBackgroundJob + type StoreBackgroundJob, + BackgroundJobStatus } from '@/modules/backgroundjobs/domain' import { buildTableHelper } from '@/modules/core/dbSchema' @@ -48,6 +53,48 @@ export const getBackgroundJobFactory = return job ?? null } +export const failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory = + ({ + db + }: { + db: Knex + }): FailQueuedBackgroundJobsWhichExceedMaximumAttempts => + async ({ jobType, originServerUrl }) => { + const query = tables + .backgroundJobs(db) + .where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl) + .andWhere( + BackgroundJobs.withoutTablePrefix.col.status, + BackgroundJobStatus.Queued + ) + .andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType) + .andWhere( + BackgroundJobs.withoutTablePrefix.col.attempt, + '>=', + db.raw('"maxAttempt"') // camel-case requires the column name to be wrapped in double quotes + ) + .orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc') + .update({ + [BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed + }) + .returning[]>('*') + + return await query + } + +export const updateBackgroundJobFactory = + ({ db }: { db: Knex }): UpdateBackgroundJob => + async ({ jobId, status }) => { + const query = tables + .backgroundJobs(db) + .update({ status }) + .where({ id: jobId }) + .returning('*') + const rows = await query + if (rows.length === 0) return null + return rows[0] + } + export const getBackgroundJobCountFactory = ({ db }: { db: Knex }): GetBackgroundJobCount => async ({ status, jobType, minAttempts }) => { diff --git a/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts b/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts index cec82f7ee..dfb12dd88 100644 --- a/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts +++ b/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts @@ -3,7 +3,8 @@ import { storeBackgroundJobFactory, getBackgroundJobFactory, BackgroundJobs, - getBackgroundJobCountFactory + getBackgroundJobCountFactory, + failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory } from '@/modules/backgroundjobs/repositories' import type { BackgroundJob, @@ -15,6 +16,31 @@ import { createRandomString } from '@/modules/core/helpers/testHelpers' const originServerUrl = 'http://example.org' +type TestJobPayload = BackgroundJobPayload & { + jobType: 'fileImport' + payloadVersion: 1 + testData: string +} + +const createTestJob = ( + overrides: Partial> = {} +): BackgroundJob => ({ + id: createRandomString(10), + jobType: 'fileImport', + payload: { + jobType: 'fileImport', + payloadVersion: 1, + testData: 'test-data-value' + }, + status: BackgroundJobStatus.Queued, + attempt: 0, + maxAttempt: 3, + timeoutMs: 30000, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides +}) + describe('Background Jobs repositories @backgroundjobs', () => { const storeBackgroundJob = storeBackgroundJobFactory({ db, @@ -23,31 +49,6 @@ describe('Background Jobs repositories @backgroundjobs', () => { const getBackgroundJob = getBackgroundJobFactory({ db }) const getBackgroundJobCount = getBackgroundJobCountFactory({ db }) - type TestJobPayload = BackgroundJobPayload & { - jobType: 'fileImport' - payloadVersion: 1 - testData: string - } - - const createTestJob = ( - overrides: Partial> = {} - ): BackgroundJob => ({ - id: createRandomString(10), - jobType: 'fileImport', - payload: { - jobType: 'fileImport', - payloadVersion: 1, - testData: 'test-data-value' - }, - status: BackgroundJobStatus.Queued, - attempt: 0, - maxAttempt: 3, - timeoutMs: 30000, - createdAt: new Date(), - updatedAt: new Date(), - ...overrides - }) - beforeEach(async () => { // Clean up background jobs table await db(BackgroundJobs.name).del() @@ -171,4 +172,24 @@ describe('Background Jobs repositories @backgroundjobs', () => { expect(count).to.equal(1) }) }) + + describe('failQueuedBackgroundJobsWhichExceedMaximumAttempts', () => { + it('should fail queued background jobs that exceed maximum attempts', async () => { + const job = createTestJob({ + status: BackgroundJobStatus.Queued, + attempt: 2, + maxAttempt: 2 + }) + await storeBackgroundJob({ job }) + + const SUT = failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ + db + }) + + await SUT({ originServerUrl, jobType: 'fileImport' }) + + const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first() + expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed) + }) + }) }) diff --git a/packages/server/modules/fileuploads/domain/consts.ts b/packages/server/modules/fileuploads/domain/consts.ts index 10a9eb3fc..8e99c722e 100644 --- a/packages/server/modules/fileuploads/domain/consts.ts +++ b/packages/server/modules/fileuploads/domain/consts.ts @@ -5,3 +5,16 @@ export const FileUploadDatabaseEvents = { export const DelayBetweenFileImportRetriesMinutes = 5 export const NumberOfFileImportRetries = 5 +export const BackgroundJobType = { + FileImport: 'fileImport' +} as const + +export type BackgroundJobType = + (typeof BackgroundJobType)[keyof typeof BackgroundJobType] + +export const BackgroundJobPayloadVersion = { + v1: 1 +} as const + +export type BackgroundJobPayloadVersion = + (typeof BackgroundJobPayloadVersion)[keyof typeof BackgroundJobPayloadVersion] diff --git a/packages/server/modules/fileuploads/domain/operations.ts b/packages/server/modules/fileuploads/domain/operations.ts index 7cba65e0a..7981521ef 100644 --- a/packages/server/modules/fileuploads/domain/operations.ts +++ b/packages/server/modules/fileuploads/domain/operations.ts @@ -58,6 +58,10 @@ export type GarbageCollectPendingUploadedFiles = (args: { timeoutThresholdSeconds: number }) => Promise +export type FailPendingUploadedFiles = (args: { + uploadIds: string[] +}) => Promise + export type NotifyChangeInFileStatus = (params: { file: FileUploadRecord }) => Promise diff --git a/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts b/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts index a19251932..e652e09df 100644 --- a/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts +++ b/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts @@ -26,6 +26,7 @@ import { import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper' import { fileImportServiceShouldUsePrivateObjectsServerUrl, + getFileImporterQueuePostgresUrl, getFileUploadUrlExpiryMinutes, getPrivateObjectsServerOrigin, getServerOrigin, @@ -75,6 +76,8 @@ import { onFileImportResultFactory } from '@/modules/fileuploads/services/result import type { FileImportResultPayload } from '@speckle/shared/workers/fileimport' import { JobResultStatus } from '@speckle/shared/workers/fileimport' import type { GraphQLContext } from '@/modules/shared/helpers/typeHelper' +import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories' +import { configureClient } from '@/knexfile' const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED } = getFeatureFlags() @@ -102,6 +105,11 @@ const getFileUploadModel = async (params: { return null } +const fileImporterConnectionUri = getFileImporterQueuePostgresUrl() +const queueDb = fileImporterConnectionUri + ? configureClient({ postgres: { connectionUri: fileImporterConnectionUri } }).public + : db + const fileUploadMutations: Resolvers['FileUploadMutations'] = { async generateUploadUrl(_parent, args, ctx) { const { projectId } = args.input @@ -294,7 +302,11 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = { logger: logger.child({ fileUploadStatus: status }), updateFileUpload: updateFileUploadFactory({ db: projectDb }), getFileInfo: getFileInfoFactoryV2({ db: projectDb }), - eventEmit: getEventBus().emit + updateBackgroundJob: updateBackgroundJobFactory({ + db: queueDb + }), + eventEmit: getEventBus().emit, + FF_NEXT_GEN_FILE_IMPORTER_ENABLED }) await onFileImportResult({ diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index d68caccc3..30589e8e4 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -1,5 +1,4 @@ import type cron from 'node-cron' -import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' import { moduleLogger } from '@/observability/logging' import { onFileImportProcessedFactory, @@ -21,26 +20,16 @@ import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { listenFor } from '@/modules/core/utils/dbNotificationListener' import { getEventBus } from '@/modules/shared/services/eventBus' import { - expireOldPendingUploadsFactory, getFileInfoFactory, updateFileUploadFactory } from '@/modules/fileuploads/repositories/fileUploads' import { db } from '@/db/knex' -import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper' -import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector' import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' import { acquireTaskLockFactory, releaseTaskLockFactory } from '@/modules/core/repositories/scheduledTasks' -import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' -import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks' -import { TIME } from '@speckle/shared' -import { - DelayBetweenFileImportRetriesMinutes, - FileUploadDatabaseEvents, - NumberOfFileImportRetries -} from '@/modules/fileuploads/domain/consts' +import { FileUploadDatabaseEvents } from '@/modules/fileuploads/domain/consts' import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router' import { shutdownQueues, @@ -53,54 +42,15 @@ import { reportSubscriptionEventsFactory } from '@/modules/fileuploads/events/su import { configureClient } from '@/knexfile' import { MisconfiguredEnvironmentError } from '@/modules/shared/errors' import { rhinoImporterSupportedFileExtensions } from '@speckle/shared/blobs' +import { scheduleFileImportExpiry } from '@/modules/fileuploads/tasks/expireFileImports' +import { scheduleBackgroundJobGarbageCollection } from '@/modules/fileuploads/tasks/garbageCollectBackgroundJobs' const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED, FF_RHINO_FILE_IMPORTER_ENABLED } = getFeatureFlags() -let scheduledTasks: cron.ScheduledTask[] = [] +const EveryMinute = '*/1 * * * *' -const scheduleFileImportExpiry = async ({ - scheduleExecution -}: { - scheduleExecution: ScheduleExecution -}) => { - const fileImportExpiryHandlers: ReturnType[] = - [] - const regionClients = await getRegisteredDbClients() - for (const projectDb of [db, ...regionClients]) { - fileImportExpiryHandlers.push( - manageFileImportExpiryFactory({ - garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({ - db: projectDb - }), - notifyUploadStatus: notifyChangeInFileStatus({ - eventEmit: getEventBus().emit - }) - }) - ) - } - - const cronExpression = '*/5 * * * *' // every 5 minutes - return scheduleExecution( - cronExpression, - 'FileImportExpiry', - async (_scheduledTime, { logger }) => { - await Promise.all( - fileImportExpiryHandlers.map((handler) => - handler({ - logger, - timeoutThresholdSeconds: - (NumberOfFileImportRetries * - (getFileImportTimeLimitMinutes() + - DelayBetweenFileImportRetriesMinutes) + - 1) * // additional buffer of 1 minute - TIME.minute - }) - ) - ) - } - ) -} +const scheduledTasks: cron.ScheduledTask[] = [] export const init: SpeckleModule['init'] = async ({ app, @@ -116,10 +66,14 @@ export const init: SpeckleModule['init'] = async ({ if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED) moduleLogger.info('📄 Next Gen File Importer is ENABLED') + const scheduleExecution = scheduleExecutionFactory({ + acquireTaskLock: acquireTaskLockFactory({ db }), + releaseTaskLock: releaseTaskLockFactory({ db }) + }) + let observeResult: ObserveResult | undefined = undefined if (isInitial) { - // this feature flag is going away soon if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED) { moduleLogger.info('🗳️ Next Gen File importer is ENABLED') const connectionUri = getFileImporterQueuePostgresUrl() @@ -152,14 +106,22 @@ export const init: SpeckleModule['init'] = async ({ registers: [metricsRegister], requestQueues })) + + scheduledTasks.push( + await scheduleBackgroundJobGarbageCollection({ + queueDb, + scheduleExecution, + cronExpression: EveryMinute + }) + ) } - const scheduleExecution = scheduleExecutionFactory({ - acquireTaskLock: acquireTaskLockFactory({ db }), - releaseTaskLock: releaseTaskLockFactory({ db }) - }) - - scheduledTasks = [await scheduleFileImportExpiry({ scheduleExecution })] + scheduledTasks.push( + await scheduleFileImportExpiry({ + scheduleExecution, + cronExpression: EveryMinute + }) + ) await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => { const parsedMessage = parseMessagePayload(msg.payload) @@ -201,7 +163,6 @@ export const init: SpeckleModule['init'] = async ({ })() } - // the two routers can be used independently and can both be enabled app.use(fileuploadRouterFactory()) } diff --git a/packages/server/modules/fileuploads/queues/fileimports.ts b/packages/server/modules/fileuploads/queues/fileimports.ts index ef2293426..d0ae00017 100644 --- a/packages/server/modules/fileuploads/queues/fileimports.ts +++ b/packages/server/modules/fileuploads/queues/fileimports.ts @@ -8,7 +8,9 @@ import type { JobPayload } from '@speckle/shared/workers/fileimport' import type { FileImportQueue } from '@/modules/fileuploads/domain/types' import { NumberOfFileImportRetries, - DelayBetweenFileImportRetriesMinutes + DelayBetweenFileImportRetriesMinutes, + BackgroundJobType, + BackgroundJobPayloadVersion } from '@/modules/fileuploads/domain/consts' import type { Knex } from 'knex' import { migrateDbToLatest } from '@/db/migrations' @@ -17,7 +19,7 @@ import { getBackgroundJobCountFactory, storeBackgroundJobFactory } from '@/modules/backgroundjobs/repositories' -import { BackgroundJobStatus, BackgroundJobType } from '@/modules/backgroundjobs/domain' +import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain' export const fileImportQueues: FileImportQueue[] = [] @@ -55,7 +57,11 @@ export const initializePostgresQueue = async ({ shutdown: async () => {}, scheduleJob: async (jobData: JobPayload) => { await createBackgroundJob({ - jobPayload: { jobType: 'fileImport', payloadVersion: 1, ...jobData } + jobPayload: { + jobType: BackgroundJobType.FileImport, + payloadVersion: BackgroundJobPayloadVersion.v1, + ...jobData + } }) }, metrics: { diff --git a/packages/server/modules/fileuploads/repositories/fileUploads.ts b/packages/server/modules/fileuploads/repositories/fileUploads.ts index c5cc50113..0758db277 100644 --- a/packages/server/modules/fileuploads/repositories/fileUploads.ts +++ b/packages/server/modules/fileuploads/repositories/fileUploads.ts @@ -11,7 +11,8 @@ import type { GetModelUploadsItems, GetModelUploadsBaseArgs, GetModelUploadsTotalCount, - UpdateFileStatus + UpdateFileStatus, + FailPendingUploadedFiles } from '@/modules/fileuploads/domain/operations' import type { FileUploadRecord, @@ -182,6 +183,26 @@ export const expireOldPendingUploadsFactory = return updatedRows } +export const failPendingUploadedFilesFactory = + (deps: { db: Knex }): FailPendingUploadedFiles => + async (params) => { + const updatedRows = await deps + .db(FileUploads.name) + .whereIn(FileUploads.withoutTablePrefix.col.id, params.uploadIds) + .whereIn(FileUploads.withoutTablePrefix.col.convertedStatus, [ + FileUploadConvertedStatus.Queued, + FileUploadConvertedStatus.Converting + ]) + .update({ + [FileUploads.withoutTablePrefix.col.convertedStatus]: + FileUploadConvertedStatus.Error, + [FileUploads.withoutTablePrefix.col.convertedMessage]: 'File import job failed', + [FileUploads.withoutTablePrefix.col.convertedLastUpdate]: deps.db.fn.now() + }) + .returning('*') + return updatedRows + } + const getPendingUploadsBaseQueryFactory = (deps: { db: Knex }) => (streamId: string, options?: Partial<{ ignoreOld: boolean; limit: number }>) => { diff --git a/packages/server/modules/fileuploads/services/resultHandler.ts b/packages/server/modules/fileuploads/services/resultHandler.ts index aeb71649a..8766424d6 100644 --- a/packages/server/modules/fileuploads/services/resultHandler.ts +++ b/packages/server/modules/fileuploads/services/resultHandler.ts @@ -13,12 +13,19 @@ import type { FileUploadRecord } from '@/modules/fileuploads/helpers/types' import { FileImportJobNotFoundError } from '@/modules/fileuploads/helpers/errors' import type { EventBusEmit } from '@/modules/shared/services/eventBus' import { FileuploadEvents } from '@/modules/fileuploads/domain/events' +import { + BackgroundJobStatus, + type UpdateBackgroundJob +} from '@/modules/backgroundjobs/domain' +import { JobResultStatus } from '@speckle/shared/workers/fileimport' type OnFileImportResultDeps = { getFileInfo: GetFileInfoV2 updateFileUpload: UpdateFileUpload + updateBackgroundJob: UpdateBackgroundJob eventEmit: EventBusEmit logger: Logger + FF_NEXT_GEN_FILE_IMPORTER_ENABLED: boolean } export const onFileImportResultFactory = @@ -46,8 +53,10 @@ export const onFileImportResultFactory = }) let convertedCommitId = null + let newStatusForBackgroundJob: BackgroundJobStatus = BackgroundJobStatus.Processing + switch (jobResult.status) { - case 'error': + case JobResultStatus.Error: boundLogger.warn( { duration: jobResult.result.durationSeconds, @@ -55,9 +64,11 @@ export const onFileImportResultFactory = }, 'Processing error result for file upload' ) + newStatusForBackgroundJob = BackgroundJobStatus.Failed break - case 'success': + case JobResultStatus.Success: convertedCommitId = jobResult.result.versionId + newStatusForBackgroundJob = BackgroundJobStatus.Succeeded boundLogger.info( { duration: jobResult.result.durationSeconds, @@ -71,6 +82,23 @@ export const onFileImportResultFactory = const status = jobResultStatusToFileUploadStatus(jobResult.status) const convertedMessage = jobResultToConvertedMessage(jobResult) + if (deps.FF_NEXT_GEN_FILE_IMPORTER_ENABLED) { + try { + await deps.updateBackgroundJob({ + jobId, + status: newStatusForBackgroundJob + }) + } catch (e) { + const err = ensureError(e) + logger.error( + { err }, + 'Error updating background job status in database. Job ID: %s', + jobId + ) + throw err + } + } + let updatedFile: FileUploadRecord try { updatedFile = await deps.updateFileUpload({ diff --git a/packages/server/modules/fileuploads/services/tasks.ts b/packages/server/modules/fileuploads/services/tasks.ts index c3ea716d4..adb47e0db 100644 --- a/packages/server/modules/fileuploads/services/tasks.ts +++ b/packages/server/modules/fileuploads/services/tasks.ts @@ -1,8 +1,13 @@ import type { Logger } from '@/observability/logging' import type { + FailPendingUploadedFiles, GarbageCollectPendingUploadedFiles, NotifyChangeInFileStatus } from '@/modules/fileuploads/domain/operations' +import type { FailQueuedBackgroundJobsWhichExceedMaximumAttempts } from '@/modules/backgroundjobs/domain' +import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport' +import { BackgroundJobType } from '@/modules/fileuploads/domain/consts' +import { LogicError } from '@/modules/shared/errors' export const manageFileImportExpiryFactory = (deps: { garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles @@ -22,3 +27,53 @@ export const manageFileImportExpiryFactory = (deps: { } } } + +export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: { + failQueuedBackgroundJobsWhichExceedMaximumAttempts: FailQueuedBackgroundJobsWhichExceedMaximumAttempts + failPendingUploadedFiles: FailPendingUploadedFiles + notifyUploadStatus: NotifyChangeInFileStatus +}): ((params: { logger: Logger; originServerUrl: string }) => Promise) => { + const { + failQueuedBackgroundJobsWhichExceedMaximumAttempts, + failPendingUploadedFiles, + notifyUploadStatus + } = deps + return async (params) => { + const { logger, originServerUrl } = params + + const failedBackgroundJobs = + await failQueuedBackgroundJobsWhichExceedMaximumAttempts({ + originServerUrl, + jobType: BackgroundJobType.FileImport + }) + logger.info( + `Found ${failedBackgroundJobs.length} background jobs which have exceeded maximum number of attempts` + ) + + if (failedBackgroundJobs.length === 0) { + return + } + + const fileIds = failedBackgroundJobs.map((job) => job.payload.blobId) + if (fileIds.length !== failedBackgroundJobs.length || fileIds.some((id) => !id)) { + throw new LogicError( + 'We do not have a valid file Id for all failed background jobs', + { + info: { + fileIds + } + } + ) + } + + const updatedUploads = await failPendingUploadedFiles({ + uploadIds: fileIds + }) + + for (const upload of updatedUploads) { + await notifyUploadStatus({ + file: upload + }) + } + } +} diff --git a/packages/server/modules/fileuploads/tasks/expireFileImports.ts b/packages/server/modules/fileuploads/tasks/expireFileImports.ts new file mode 100644 index 000000000..1b9443b44 --- /dev/null +++ b/packages/server/modules/fileuploads/tasks/expireFileImports.ts @@ -0,0 +1,57 @@ +import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' +import { expireOldPendingUploadsFactory } from '@/modules/fileuploads/repositories/fileUploads' +import { db } from '@/db/knex' +import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper' +import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector' +import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' +import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks' +import { TIME } from '@speckle/shared' +import { + DelayBetweenFileImportRetriesMinutes, + NumberOfFileImportRetries +} from '@/modules/fileuploads/domain/consts' +import { getEventBus } from '@/modules/shared/services/eventBus' + +export const scheduleFileImportExpiry = async ({ + scheduleExecution, + cronExpression +}: { + scheduleExecution: ScheduleExecution + cronExpression: string +}) => { + const fileImportExpiryHandlers: ReturnType[] = + [] + const regionClients = await getRegisteredDbClients() + for (const projectDb of [db, ...regionClients]) { + fileImportExpiryHandlers.push( + manageFileImportExpiryFactory({ + garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({ + db: projectDb + }), + notifyUploadStatus: notifyChangeInFileStatus({ + eventEmit: getEventBus().emit + }) + }) + ) + } + + return scheduleExecution( + cronExpression, + 'FileImportExpiry', + async (_scheduledTime, { logger }) => { + await Promise.all( + fileImportExpiryHandlers.map((handler) => + handler({ + logger, + timeoutThresholdSeconds: + (NumberOfFileImportRetries * + (getFileImportTimeLimitMinutes() + + DelayBetweenFileImportRetriesMinutes) + + 1) * // additional buffer of 1 minute + TIME.minute + }) + ) + ) + } + ) +} diff --git a/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts b/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts new file mode 100644 index 000000000..685d542ca --- /dev/null +++ b/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts @@ -0,0 +1,54 @@ +import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' +import { db } from '@/db/knex' +import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector' +import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks' +import { failPendingUploadedFilesFactory } from '@/modules/fileuploads/repositories/fileUploads' +import { failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory } from '@/modules/backgroundjobs/repositories' +import type { Knex } from 'knex' +import { getServerOrigin } from '@/modules/shared/helpers/envHelper' + +export const scheduleBackgroundJobGarbageCollection = async ({ + queueDb, + scheduleExecution, + cronExpression +}: { + queueDb: Knex + scheduleExecution: ScheduleExecution + cronExpression: string +}) => { + const perDbTask: ReturnType< + typeof garbageCollectAttemptedFileImportBackgroundJobsFactory + >[] = [] + const regionClients = await getRegisteredDbClients() + for (const projectDb of [db, ...regionClients]) { + perDbTask.push( + garbageCollectAttemptedFileImportBackgroundJobsFactory({ + failQueuedBackgroundJobsWhichExceedMaximumAttempts: + failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ + db: queueDb + }), + failPendingUploadedFiles: failPendingUploadedFilesFactory({ db: projectDb }), + notifyUploadStatus: notifyChangeInFileStatus({ + eventEmit: getEventBus().emit + }) + }) + ) + } + + return scheduleExecution( + cronExpression, + 'GarbageCollectBackgroundJobs', + async (_scheduledTime, { logger }) => { + await Promise.all( + perDbTask.map((task) => + task({ + logger, + originServerUrl: getServerOrigin() + }) + ) + ) + } + ) +} diff --git a/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts b/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts new file mode 100644 index 000000000..0504f3f0d --- /dev/null +++ b/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts @@ -0,0 +1,223 @@ +import { expect } from 'chai' +import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks' +import { + BackgroundJobs, + failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory, + getBackgroundJobFactory, + storeBackgroundJobFactory +} from '@/modules/backgroundjobs/repositories' +import { db } from '@/db/knex' +import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { + failPendingUploadedFilesFactory, + getFileInfoFactoryV2, + saveUploadFileFactoryV2 +} from '@/modules/fileuploads/repositories/fileUploads' +import { testLogger } from '@/observability/logging' +import { + type BackgroundJobPayload, + BackgroundJobStatus, + type BackgroundJob +} from '@/modules/backgroundjobs/domain' +import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport' +import cryptoRandomString from 'crypto-random-string' +import type { FileUploadRecordV2 } from '@/modules/fileuploads/helpers/types' +import { + type BasicTestStream, + createTestStream +} from '@/test/speckle-helpers/streamHelper' +import { type BasicTestUser, createTestUser } from '@/test/authHelper' +import type { BasicTestBranch } from '@/test/speckle-helpers/branchHelper' +import { createTestBranch } from '@/test/speckle-helpers/branchHelper' +import { FileUploadConvertedStatus } from '@speckle/shared/blobs' + +const originServerUrl = 'https://example.org' + +export type TestJobPayload = BackgroundJobPayload & { + jobType: 'fileImport' + payloadVersion: 1 + blobId: string + testData: string +} + +export const createTestJob = ( + overrides: Partial> = {} +): BackgroundJob => ({ + id: cryptoRandomString({ length: 10 }), + jobType: 'fileImport', + payload: { + jobType: 'fileImport', + payloadVersion: 1, + blobId: cryptoRandomString({ length: 10 }), + testData: 'test-data-value' + }, + status: BackgroundJobStatus.Queued, + attempt: 0, + maxAttempt: 3, + timeoutMs: 30000, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides +}) + +const createTestJobPayload = (overrides: Partial): TestJobPayload => ({ + jobType: 'fileImport', + payloadVersion: 1, + blobId: cryptoRandomString({ length: 10 }), + testData: cryptoRandomString({ length: 100 }), + ...overrides +}) + +const createTestFileUpload = ( + overrides: Partial< + Pick< + FileUploadRecordV2, + 'projectId' | 'userId' | 'fileName' | 'fileType' | 'fileSize' + > & { fileId: string; modelId: string; modelName: string } + > +) => { + return { + projectId: cryptoRandomString({ length: 10 }), + userId: cryptoRandomString({ length: 10 }), + fileName: cryptoRandomString({ length: 10 }), + fileType: cryptoRandomString({ length: 10 }), + fileSize: Math.floor(Math.random() * 10_000), + fileId: cryptoRandomString({ length: 10 }), + modelId: cryptoRandomString({ length: 10 }), + modelName: cryptoRandomString({ length: 10 }), + ...overrides + } +} + +type StoredBackgroundJob = BackgroundJob & { + originServerUrl: string +} + +describe('File import garbage collection @fileuploads integration', () => { + const storeBackgroundJob = storeBackgroundJobFactory({ + db, + originServerUrl + }) + const getBackgroundJob = getBackgroundJobFactory({ db }) + const saveUploadFile = saveUploadFileFactoryV2({ db }) + const getUploadFile = getFileInfoFactoryV2({ db }) + + let userOne: BasicTestUser + let projectOne: BasicTestStream + let modelOne: BasicTestBranch + + before(async () => { + userOne = await createTestUser({ + id: '', + email: cryptoRandomString({ length: 10 }) + '@example.org', + name: cryptoRandomString({ length: 10 }) + }) + projectOne = await createTestStream( + { + id: '', + name: cryptoRandomString({ length: 10 }), + ownerId: userOne.id + }, + userOne + ) + modelOne = await createTestBranch({ + branch: { + id: '', + name: cryptoRandomString({ length: 10 }), + authorId: userOne.id, + streamId: projectOne.id + }, + stream: projectOne, + owner: userOne + }) + }) + + beforeEach(async () => { + // Clean up background jobs table + await db(BackgroundJobs.name).del() + }) + + describe('garbage collect file import background jobs', () => { + const SUT = garbageCollectAttemptedFileImportBackgroundJobsFactory({ + failQueuedBackgroundJobsWhichExceedMaximumAttempts: + failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ + db + }), + failPendingUploadedFiles: failPendingUploadedFilesFactory({ + db + }), + notifyUploadStatus: notifyChangeInFileStatus({ + eventEmit: getEventBus().emit + }) + }) + + it('should garbage collect failed background jobs', async () => { + const identifiableData = cryptoRandomString({ length: 10 }) + + const fileOne = createTestFileUpload({ + projectId: projectOne.id, + modelId: modelOne.id + }) + const processingJobAtMaxAttempts = createTestJob({ + status: BackgroundJobStatus.Processing, + payload: createTestJobPayload({ + testData: identifiableData, + blobId: fileOne.fileId + }), + attempt: 3, + maxAttempt: 3 + }) + + const fileTwo = createTestFileUpload({ + projectId: projectOne.id, + modelId: modelOne.id + }) + const queuedJobAtMaxAttempts = createTestJob({ + status: BackgroundJobStatus.Queued, + payload: createTestJobPayload({ + testData: identifiableData, + blobId: fileTwo.fileId + }), + attempt: 3, + maxAttempt: 3 + }) + await saveUploadFile(fileOne) + await storeBackgroundJob({ job: processingJobAtMaxAttempts }) + await saveUploadFile(fileTwo) + await storeBackgroundJob({ job: queuedJobAtMaxAttempts }) + + // ensure jobs are in the database and retrievable + const existing = await db(BackgroundJobs.name) + .whereJsonSupersetOf(BackgroundJobs.withoutTablePrefix.col.payload, { + testData: identifiableData + }) + .select('*') + expect(existing).to.have.length(2) + expect( + existing.filter((j) => j.status === BackgroundJobStatus.Queued), + JSON.stringify(existing) + ).to.have.length(1) + expect( + existing.filter((j) => j.status === BackgroundJobStatus.Processing), + JSON.stringify(existing) + ).to.have.length(1) + + await SUT({ logger: testLogger, originServerUrl }) + + // processing job should not have been garbage collected + const resultOne = await getBackgroundJob({ jobId: processingJobAtMaxAttempts.id }) + expect(resultOne?.status).to.equal(BackgroundJobStatus.Processing) + + // queued job should have been garbage collected + const resultTwo = await getBackgroundJob({ jobId: queuedJobAtMaxAttempts.id }) + expect(resultTwo?.status).to.equal(BackgroundJobStatus.Failed) + + const fileOneResult = await getUploadFile({ fileId: fileOne.fileId }) + const fileTwoResult = await getUploadFile({ fileId: fileTwo.fileId }) + + expect(fileOneResult?.convertedStatus).to.equal(FileUploadConvertedStatus.Queued) + expect(fileTwoResult?.convertedStatus).to.equal(FileUploadConvertedStatus.Error) + }) + }) +}) diff --git a/packages/shared/src/workers/fileimport/job.ts b/packages/shared/src/workers/fileimport/job.ts index 786705d42..17294ea86 100644 --- a/packages/shared/src/workers/fileimport/job.ts +++ b/packages/shared/src/workers/fileimport/job.ts @@ -68,3 +68,8 @@ export const fileImportResultPayload = z.discriminatedUnion('status', [ ]) export type FileImportResultPayload = z.infer + +export type FileImportJobPayloadV1 = JobPayload & { + jobType: 'fileImport' + payloadVersion: 1 +}