From 79ef564e1b6f9dce2f01df738a8fd223f3d01d67 Mon Sep 17 00:00:00 2001 From: Iain Sproat <68657+iainsproat@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:26:41 +0100 Subject: [PATCH] feat(server/fileuploads): background jobs draws down a compute budget (#5349) --- .pre-commit-config.yaml | 3 +- .../src/ifc_importer/domain.py | 1 + .../src/ifc_importer/job_processor.py | 56 +++++++----- .../src/ifc_importer/repository.py | 54 ++++++++++-- .../fileuploads/typedefs/fileuploads.graphql | 4 + .../{domain.ts => domain/types.ts} | 8 +- .../20250904122458_compute_budget.ts | 18 ++++ .../backgroundjobs.ts} | 85 ++++++++++--------- .../modules/backgroundjobs/services/create.ts | 4 +- .../tests/integration/repositories.spec.ts | 62 ++++++++++++-- .../tests/unit/services.spec.ts | 10 ++- .../modules/fileuploads/domain/consts.ts | 13 ++- .../modules/fileuploads/domain/operations.ts | 10 +-- .../graph/resolvers/fileUploads.ts | 8 +- packages/server/modules/fileuploads/index.ts | 70 +++++++-------- .../modules/fileuploads/queues/fileimports.ts | 28 +++--- .../fileuploads/services/createFileImport.ts | 18 ++-- .../fileuploads/services/management.ts | 1 - .../fileuploads/services/resultHandler.ts | 33 +++---- .../modules/fileuploads/services/tasks.ts | 38 ++++----- .../fileuploads/tasks/expireFileImports.ts | 14 +-- .../tasks/garbageCollectBackgroundJobs.ts | 12 +-- .../fileuploads/tests/helpers/creation.ts | 3 +- .../tests/integration/tasks.spec.ts | 22 ++--- .../tests/unit/fileuploads.spec.ts | 5 +- packages/shared/src/workers/fileimport/job.ts | 34 ++++---- 26 files changed, 368 insertions(+), 246 deletions(-) rename packages/server/modules/backgroundjobs/{domain.ts => domain/types.ts} (88%) create mode 100644 packages/server/modules/backgroundjobs/migrations/20250904122458_compute_budget.ts rename packages/server/modules/backgroundjobs/{repositories.ts => repositories/backgroundjobs.ts} (50%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 546d6c441..bfbae7b60 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,7 +29,8 @@ repos: name: ruff checker (python linter) args: - --ignore=E501 # ignoring error about lines that are too long - # Run the linter. + # - --fix # uncomment to automatically fix linting errors + # Run the linter. - id: ruff-format name: ruff formatter (python formatter) diff --git a/packages/ifc-import-service/src/ifc_importer/domain.py b/packages/ifc-import-service/src/ifc_importer/domain.py index 98a03033b..500062b6b 100644 --- a/packages/ifc-import-service/src/ifc_importer/domain.py +++ b/packages/ifc-import-service/src/ifc_importer/domain.py @@ -47,3 +47,4 @@ class FileimportJob(JobBase): max_attempt: int created_at: datetime updated_at: datetime + remaining_compute_budget_seconds: int diff --git a/packages/ifc-import-service/src/ifc_importer/job_processor.py b/packages/ifc-import-service/src/ifc_importer/job_processor.py index 2708ca6d8..a2ad173af 100644 --- a/packages/ifc-import-service/src/ifc_importer/job_processor.py +++ b/packages/ifc-import-service/src/ifc_importer/job_processor.py @@ -1,6 +1,7 @@ import asyncio import tempfile import time +from math import floor from pathlib import Path import structlog @@ -16,7 +17,12 @@ from specklepy.core.api.inputs.file_import_inputs import ( from specklepy.core.api.models import Version from ifc_importer.domain import FileimportPayload, JobStatus -from ifc_importer.repository import get_next_job, return_job_to_queued, setup_connection +from ifc_importer.repository import ( + deduct_from_compute_budget, + get_next_job, + return_job_to_queued, + setup_connection, +) IDLE_TIMEOUT = 1 @@ -82,6 +88,11 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): continue start = time.time() + duration = 0 + job_timeout = max( + 0, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds) + ) + speckle_client = setup_client(job.payload) job_id = job.id @@ -98,17 +109,23 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): ) logger = logger.bind(job_id=job_id, project_id=job.payload.project_id) - logger.info("starting job") + logger.info( + "starting job {job_id} for project {project_id}, attempt {attempt} / {max_attempts} with remaining compute budget {remaining_compute_budget_seconds}s and timeout {job_timeout}s", + attempt=attempt, + max_attempts=job.max_attempt, + remaining_compute_budget_seconds=job.remaining_compute_budget_seconds, + job_timeout=job_timeout, + ) handler = job_handler(speckle_client, job.payload, logger) # this will raise a TimeoutError if handler does not complete in time version, download_duration, parse_duration = await asyncio.wait_for( - handler, timeout=job.payload.time_out_seconds + handler, timeout=job_timeout ) version_id = version.id duration = time.time() - start logger.info( - "Finished job after {duration} created version {version_id}", + "Finished parsing job after {duration}s, creating version {version_id}", duration=duration, version_id=version_id, ) @@ -116,7 +133,7 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): _ = speckle_client.file_import.finish_file_import_job( FileImportSuccessInput( project_id=job.payload.project_id, - # for some reason, the blob id identifies the job here + # the blob id identifies the "job" here job_id=job.payload.blob_id, result=FileImportResult( parser=parser, @@ -131,32 +148,28 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): # mark it as succeeded so we do not enter any error handling routines on finalisation job_status = JobStatus.SUCCEEDED - except TimeoutError as te: - # if it times out we allow re-queueing until it reaches max tries - ex = te - if attempt >= job.max_attempt: - job_status = JobStatus.FAILED - else: - job_status = JobStatus.QUEUED - # raised if the task is canceled except Exception as e: # ex = e job_status = JobStatus.FAILED finally: - if job_status == JobStatus.QUEUED: - await return_job_to_queued(connection, job_id) - elif job_status == JobStatus.FAILED: + if duration <= 0: + # it probably failed before we calculated the duration, so calculate it now + duration = time.time() - start + await deduct_from_compute_budget( + connection, logger, job_id, floor(duration) + ) + + if job_status == JobStatus.FAILED: # we should be reporting the failure to the server logger.error("job processing failed", exc_info=ex) try: _ = speckle_client.file_import.finish_file_import_job( FileImportErrorInput( project_id=job.payload.project_id, - # for some reason, the blob id identifies the job here + # the blob id identifies the job to the server job_id=job.payload.blob_id, - # job_id=job_id, reason=str(ex), result=FileImportResult( parser=parser, @@ -170,9 +183,10 @@ async def job_processor(logger: structlog.stdlib.BoundLogger): # the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further except Exception as ex: logger.error("failed to report job failure", exc_info=ex) - await return_job_to_queued(connection, job_id) - # The client will not pick up a queued job if it now has exceeded max attempts. - # The server is responsible for moving queued jobs which have exceeded maximum attempts to failed status. + # somehow we're in a weird state, let's return the job to the queued state + # where it will get picked up again until one of total timeout, max attempts, or exhausted compute budget is reached + # The server is responsible for garbage collecting jobs which have reached these error conditions and moving them to a failed status. + await return_job_to_queued(connection, logger, job_id) elif job_status == JobStatus.SUCCEEDED: # do nothing # we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server) diff --git a/packages/ifc-import-service/src/ifc_importer/repository.py b/packages/ifc-import-service/src/ifc_importer/repository.py index 0c0b1b66e..128132c3b 100644 --- a/packages/ifc-import-service/src/ifc_importer/repository.py +++ b/packages/ifc-import-service/src/ifc_importer/repository.py @@ -1,5 +1,6 @@ import json +import structlog from asyncpg import Connection, connect from ifc_importer.config import settings @@ -30,15 +31,17 @@ async def get_next_job(connection: Connection) -> FileimportJob | None: "updatedAt" = NOW() WHERE id = ( SELECT id FROM background_jobs - WHERE ( --queued job + WHERE ( -- job in a QUEUED state which has not yet exceeded maximum attempts and has a positive remaining compute budget payload ->> 'fileType' = 'ifc' AND status = $2 AND "attempt" < "maxAttempt" + AND "remainingComputeBudgetSeconds"::int > 0 ) - OR ( --timed job left on processing state + OR ( -- any job left in a PROCESSING state for more than its timeout period payload ->> 'fileType' = 'ifc' AND status = $1 - AND "updatedAt" < NOW() - ("timeoutMs" * interval '1 millisecond') + AND "attempt" <= "maxAttempt" + AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int * interval '1 second' ) ORDER BY "createdAt" FOR UPDATE SKIP LOCKED @@ -56,21 +59,54 @@ async def get_next_job(connection: Connection) -> FileimportJob | None: return FileimportJob.model_validate(dict(job)) -async def return_job_to_queued(connection: Connection, job_id: str) -> None: - print(f"returning job: {job_id} to queued") - return await set_job_status(connection, job_id, JobStatus.QUEUED) +async def return_job_to_queued( + connection: Connection, logger: structlog.stdlib.BoundLogger, job_id: str +) -> None: + logger.info("returning job: {job_id} to queued", job_id=job_id) + return await set_job_status(connection, logger, job_id, JobStatus.QUEUED) async def set_job_status( - connection: Connection, job_id: str, job_status: JobStatus + connection: Connection, + logger: structlog.stdlib.BoundLogger, + job_id: str, + job_status: JobStatus, ) -> None: - print(f"updating job: {job_id}'s status to {job_status}") + logger.info( + "updating job: {job_id}'s status to {job_status}", + job_id=job_id, + job_status=job_status.value, + ) _ = await connection.execute( """ UPDATE background_jobs - SET status = $1, "updatedAt" = NOW() + SET status = $1, + "updatedAt" = NOW() WHERE id = $2 """, job_status.value, job_id, ) + + +async def deduct_from_compute_budget( + connection: Connection, + logger: structlog.stdlib.BoundLogger, + job_id: str, + used_compute_time_seconds: int, +) -> None: + logger.info( + "updating job: {job_id}'s remaining compute budget by deducting {used_compute_time_seconds} seconds", + job_id=job_id, + used_compute_time_seconds=used_compute_time_seconds, + ) + _ = await connection.execute( + """ + UPDATE background_jobs + SET "remainingComputeBudgetSeconds" = "remainingComputeBudgetSeconds"::int - $1, + "updatedAt" = NOW() + WHERE id = $2 + """, + used_compute_time_seconds, + job_id, + ) diff --git a/packages/server/assets/fileuploads/typedefs/fileuploads.graphql b/packages/server/assets/fileuploads/typedefs/fileuploads.graphql index 00883c81e..a4a30832d 100644 --- a/packages/server/assets/fileuploads/typedefs/fileuploads.graphql +++ b/packages/server/assets/fileuploads/typedefs/fileuploads.graphql @@ -154,6 +154,10 @@ enum JobResultStatus { input FinishFileImportInput { projectId: String! + """ + This is the blob Id of the uploaded file. For legacy reasons it is named jobId. + Note: This is the not the background job Id. + """ jobId: String! status: JobResultStatus! reason: String diff --git a/packages/server/modules/backgroundjobs/domain.ts b/packages/server/modules/backgroundjobs/domain/types.ts similarity index 88% rename from packages/server/modules/backgroundjobs/domain.ts rename to packages/server/modules/backgroundjobs/domain/types.ts index d75eb47a6..b52b6f26c 100644 --- a/packages/server/modules/backgroundjobs/domain.ts +++ b/packages/server/modules/backgroundjobs/domain/types.ts @@ -19,7 +19,7 @@ export type BackgroundJobPayload = z.infer export type BackgroundJobConfig = { maxAttempt: number - timeoutMs: number + remainingComputeBudgetSeconds: number } export type BackgroundJob = BackgroundJobConfig & { @@ -39,15 +39,15 @@ export type StoreBackgroundJob = (args: { export type GetBackgroundJob = (args: { jobId: string }) => Promise | null> -export type FailQueuedBackgroundJobsWhichExceedMaximumAttempts< +export type FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget< T extends BackgroundJobPayload = BackgroundJobPayload > = (args: { originServerUrl: string; jobType: string }) => Promise[]> export type UpdateBackgroundJob = (args: { - jobId: string + payloadFilter: Partial status: BackgroundJobStatus - }) => Promise | null> + }) => Promise[]> export type GetBackgroundJobCount< T extends BackgroundJobPayload = BackgroundJobPayload diff --git a/packages/server/modules/backgroundjobs/migrations/20250904122458_compute_budget.ts b/packages/server/modules/backgroundjobs/migrations/20250904122458_compute_budget.ts new file mode 100644 index 000000000..62f66fbf7 --- /dev/null +++ b/packages/server/modules/backgroundjobs/migrations/20250904122458_compute_budget.ts @@ -0,0 +1,18 @@ +import { TIME, TIME_MS } from '@speckle/shared' +import { type Knex } from 'knex' + +const JOB_TABLE_NAME = 'background_jobs' + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable(JOB_TABLE_NAME, (table) => { + table.integer('remainingComputeBudgetSeconds').defaultTo(TIME.hour).notNullable() + table.dropColumn('timeoutMs') + }) +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable(JOB_TABLE_NAME, (table) => { + table.dropColumn('remainingComputeBudgetSeconds') + table.integer('timeoutMs').defaultTo(TIME_MS.day).notNullable() + }) +} diff --git a/packages/server/modules/backgroundjobs/repositories.ts b/packages/server/modules/backgroundjobs/repositories/backgroundjobs.ts similarity index 50% rename from packages/server/modules/backgroundjobs/repositories.ts rename to packages/server/modules/backgroundjobs/repositories/backgroundjobs.ts index 5fd75fd4b..2243f2ea2 100644 --- a/packages/server/modules/backgroundjobs/repositories.ts +++ b/packages/server/modules/backgroundjobs/repositories/backgroundjobs.ts @@ -1,8 +1,8 @@ import type { Knex } from 'knex' import type { - FailQueuedBackgroundJobsWhichExceedMaximumAttempts, + FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget, UpdateBackgroundJob -} from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' import { type BackgroundJob, type BackgroundJobPayload, @@ -10,7 +10,7 @@ import { type GetBackgroundJobCount, type StoreBackgroundJob, BackgroundJobStatus -} from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' import { buildTableHelper } from '@/modules/core/dbSchema' export const BackgroundJobs = buildTableHelper('background_jobs', [ @@ -19,11 +19,11 @@ export const BackgroundJobs = buildTableHelper('background_jobs', [ 'payload', 'status', 'originServerUrl', - 'timeoutMs', 'attempt', 'maxAttempt', 'createdAt', - 'updatedAt' + 'updatedAt', + 'remainingComputeBudgetSeconds' ]) type StoredBackgroundJob = BackgroundJob & { @@ -53,46 +53,55 @@ export const getBackgroundJobFactory = return job ?? null } -export const failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory = +export const failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory = + + ({ + db + }: { + db: Knex + }): FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget => + async ({ jobType, originServerUrl }) => { + const query = tables + .backgroundJobs(db) + .where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl) + .andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType) + .whereIn(BackgroundJobs.withoutTablePrefix.col.status, [ + BackgroundJobStatus.Queued, + BackgroundJobStatus.Processing + ]) + .andWhere(function () { + this.where( + BackgroundJobs.withoutTablePrefix.col.attempt, + '>', // greater than because processing jobs may currently equal maxAttempt and still be running + db.raw('"maxAttempt"') // camelCase requires the column name to be wrapped in double quotes + ).orWhere( + BackgroundJobs.withoutTablePrefix.col.remainingComputeBudgetSeconds, + '<=', + 0 + ) + }) + .update({ + [BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed + }) + .orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc') + .returning[]>('*') + + return await query + } + +export const updateBackgroundJobFactory = ({ db }: { db: Knex - }): FailQueuedBackgroundJobsWhichExceedMaximumAttempts => - async ({ jobType, originServerUrl }) => { - const query = tables - .backgroundJobs(db) - .where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl) - .andWhere( - BackgroundJobs.withoutTablePrefix.col.status, - BackgroundJobStatus.Queued - ) - .andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType) - .andWhere( - BackgroundJobs.withoutTablePrefix.col.attempt, - '>=', - db.raw('"maxAttempt"') // camel-case requires the column name to be wrapped in double quotes - ) - .orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc') - .update({ - [BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed - }) - .returning[]>('*') - - return await query - } - -export const updateBackgroundJobFactory = - ({ db }: { db: Knex }): UpdateBackgroundJob => - async ({ jobId, status }) => { + }): UpdateBackgroundJob => + async ({ payloadFilter, status }) => { const query = tables .backgroundJobs(db) .update({ status }) - .where({ id: jobId }) - .returning('*') - const rows = await query - if (rows.length === 0) return null - return rows[0] + .whereJsonSupersetOf('payload', payloadFilter) + .returning[]>('*') + return await query } export const getBackgroundJobCountFactory = diff --git a/packages/server/modules/backgroundjobs/services/create.ts b/packages/server/modules/backgroundjobs/services/create.ts index e8297b3ec..cf906dce8 100644 --- a/packages/server/modules/backgroundjobs/services/create.ts +++ b/packages/server/modules/backgroundjobs/services/create.ts @@ -3,8 +3,8 @@ import type { BackgroundJobConfig, BackgroundJobPayload, StoreBackgroundJob -} from '@/modules/backgroundjobs/domain' -import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' +import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types' import cryptoRandomString from 'crypto-random-string' export const createBackgroundJobFactory = ({ diff --git a/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts b/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts index dfb12dd88..d6f37c40c 100644 --- a/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts +++ b/packages/server/modules/backgroundjobs/tests/integration/repositories.spec.ts @@ -4,13 +4,13 @@ import { getBackgroundJobFactory, BackgroundJobs, getBackgroundJobCountFactory, - failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory -} from '@/modules/backgroundjobs/repositories' + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory +} from '@/modules/backgroundjobs/repositories/backgroundjobs' import type { BackgroundJob, BackgroundJobPayload -} from '@/modules/backgroundjobs/domain' -import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' +import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types' import { expect } from 'chai' import { createRandomString } from '@/modules/core/helpers/testHelpers' @@ -35,9 +35,9 @@ const createTestJob = ( status: BackgroundJobStatus.Queued, attempt: 0, maxAttempt: 3, - timeoutMs: 30000, createdAt: new Date(), updatedAt: new Date(), + remainingComputeBudgetSeconds: 30, ...overrides }) @@ -173,18 +173,62 @@ describe('Background Jobs repositories @backgroundjobs', () => { }) }) - describe('failQueuedBackgroundJobsWhichExceedMaximumAttempts', () => { + describe('failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory', () => { it('should fail queued background jobs that exceed maximum attempts', async () => { const job = createTestJob({ status: BackgroundJobStatus.Queued, - attempt: 2, + attempt: 3, maxAttempt: 2 }) await storeBackgroundJob({ job }) - const SUT = failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ - db + const SUT = + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({ + db + }) + + await SUT({ originServerUrl, jobType: 'fileImport' }) + + const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first() + expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed) + }) + + it('should fail queued background jobs with zero compute budget', async () => { + const job = createTestJob({ + payload: { + jobType: 'fileImport', + payloadVersion: 1, + testData: 'complex-test-data' + }, + remainingComputeBudgetSeconds: 0 }) + await storeBackgroundJob({ job }) + + const SUT = + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({ + db + }) + + await SUT({ originServerUrl, jobType: 'fileImport' }) + + const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first() + expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed) + }) + it('should fail queued background jobs with negative compute budget', async () => { + const job = createTestJob({ + payload: { + jobType: 'fileImport', + payloadVersion: 1, + testData: 'complex-test-data' + }, + remainingComputeBudgetSeconds: -100 + }) + await storeBackgroundJob({ job }) + + const SUT = + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({ + db + }) await SUT({ originServerUrl, jobType: 'fileImport' }) diff --git a/packages/server/modules/backgroundjobs/tests/unit/services.spec.ts b/packages/server/modules/backgroundjobs/tests/unit/services.spec.ts index e2e45ea6e..c0d4f0dc9 100644 --- a/packages/server/modules/backgroundjobs/tests/unit/services.spec.ts +++ b/packages/server/modules/backgroundjobs/tests/unit/services.spec.ts @@ -5,13 +5,13 @@ import type { BackgroundJobConfig, BackgroundJobPayload, StoreBackgroundJob -} from '@/modules/backgroundjobs/domain' -import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' +import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types' describe('scheduleBackgroundJobFactory', () => { const mockJobConfig: BackgroundJobConfig = { maxAttempt: 3, - timeoutMs: 30000 + remainingComputeBudgetSeconds: 30 } interface TestJobPayload extends BackgroundJobPayload { @@ -96,7 +96,9 @@ describe('scheduleBackgroundJobFactory', () => { const result = await createBackgroundJob({ jobPayload: mockJobPayload }) expect(result.maxAttempt).to.equal(mockJobConfig.maxAttempt) - expect(result.timeoutMs).to.equal(mockJobConfig.timeoutMs) + expect(result.remainingComputeBudgetSeconds).to.equal( + mockJobConfig.remainingComputeBudgetSeconds + ) }) it('should preserve job payload', async () => { diff --git a/packages/server/modules/fileuploads/domain/consts.ts b/packages/server/modules/fileuploads/domain/consts.ts index 8e99c722e..06a2129af 100644 --- a/packages/server/modules/fileuploads/domain/consts.ts +++ b/packages/server/modules/fileuploads/domain/consts.ts @@ -1,10 +1,13 @@ +import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper' +import { TIME, TIME_MS } from '@speckle/shared' + export const FileUploadDatabaseEvents = { Updated: 'file_import_update', Started: 'file_import_started' } as const export const DelayBetweenFileImportRetriesMinutes = 5 -export const NumberOfFileImportRetries = 5 +export const NumberOfFileImportRetries = 3 export const BackgroundJobType = { FileImport: 'fileImport' } as const @@ -18,3 +21,11 @@ export const BackgroundJobPayloadVersion = { export type BackgroundJobPayloadVersion = (typeof BackgroundJobPayloadVersion)[keyof typeof BackgroundJobPayloadVersion] + +export const maximumAllowedQueuingProcessingAndRetryTimeMs = () => 1 * TIME_MS.day +// NumberOfFileImportRetries * +// (getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes + 1) * +// TIME_MS.minute // allowing an extra minute for some buffer + +export const singleAttemptMaximumProcessingTimeSeconds = () => + getFileImportTimeLimitMinutes() * TIME.minute diff --git a/packages/server/modules/fileuploads/domain/operations.ts b/packages/server/modules/fileuploads/domain/operations.ts index 7981521ef..174957e55 100644 --- a/packages/server/modules/fileuploads/domain/operations.ts +++ b/packages/server/modules/fileuploads/domain/operations.ts @@ -7,7 +7,7 @@ import type { Optional } from '@speckle/shared' import type { UploadResult } from '@/modules/blobstorage/domain/types' import type { FileImportResultPayload, - JobPayload + JobPayloadV1 } from '@speckle/shared/workers/fileimport' export type GetFileInfo = (args: { @@ -67,7 +67,7 @@ export type NotifyChangeInFileStatus = (params: { }) => Promise export type ProcessFileImportResult = (params: { - jobId: string + blobId: string jobResult: FileImportResultPayload }) => Promise @@ -82,11 +82,11 @@ export type UpdateFileStatus = (params: { export type UploadedFile = UploadResult & { userId: string } export type FileImportMessage = Pick< - JobPayload, + JobPayloadV1, 'modelId' | 'projectId' | 'fileType' | 'fileName' | 'blobId' -> & { jobId: string; userId: string } +> & { userId: string } -export type ScheduleFileimportJob = (args: JobPayload) => Promise +export type ScheduleFileimportJob = (args: JobPayloadV1) => Promise export type PushJobToFileImporter = ( args: { scheduleJob: ScheduleFileimportJob } & FileImportMessage diff --git a/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts b/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts index e652e09df..08884c0f1 100644 --- a/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts +++ b/packages/server/modules/fileuploads/graph/resolvers/fileUploads.ts @@ -76,7 +76,7 @@ import { onFileImportResultFactory } from '@/modules/fileuploads/services/result import type { FileImportResultPayload } from '@speckle/shared/workers/fileimport' import { JobResultStatus } from '@speckle/shared/workers/fileimport' import type { GraphQLContext } from '@/modules/shared/helpers/typeHelper' -import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories' +import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories/backgroundjobs' import { configureClient } from '@/knexfile' const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED } = getFeatureFlags() @@ -250,6 +250,8 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = { if (!FF_NEXT_GEN_FILE_IMPORTER_ENABLED) throw new MisconfiguredEnvironmentError('File import next gen is not enabled') + // NOTE: jobId in this context is actually the blobId of the uploaded file + // We keep the naming for backwards compatibility reasons const { projectId, jobId, status, warnings, reason, result } = args.input const userId = ctx.userId if (!userId) { @@ -294,7 +296,7 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = { projectId, streamId: projectId, //legacy userId, - jobId + blobId: jobId }) const projectDb = await getProjectDbClient({ projectId }) @@ -310,7 +312,7 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = { }) await onFileImportResult({ - jobId, + blobId: jobId, jobResult }) diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index 30589e8e4..33aa8ecf9 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -49,6 +49,7 @@ const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED, FF_RHINO_FILE_IMPORTER_ENABLED } = getFeatureFlags() const EveryMinute = '*/1 * * * *' +const EveryFiveMinutes = '*/5 * * * *' const scheduledTasks: cron.ScheduledTask[] = [] @@ -110,46 +111,47 @@ export const init: SpeckleModule['init'] = async ({ scheduledTasks.push( await scheduleBackgroundJobGarbageCollection({ queueDb, + scheduleExecution, + cronExpression: EveryFiveMinutes + }) + ) + } else { + // feature flag is not enabled + scheduledTasks.push( + await scheduleFileImportExpiry({ scheduleExecution, cronExpression: EveryMinute }) ) + + await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => { + const parsedMessage = parseMessagePayload(msg.payload) + if (!parsedMessage.streamId) return + const projectDb = await getProjectDbClient({ + projectId: parsedMessage.streamId + }) + + await onFileImportProcessedFactory({ + getFileInfo: getFileInfoFactory({ db: projectDb }), + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), + updateFileUpload: updateFileUploadFactory({ db: projectDb }), + eventEmit: getEventBus().emit + })(parsedMessage) + }) + + await listenFor(FileUploadDatabaseEvents.Started, async (msg) => { + const parsedMessage = parseMessagePayload(msg.payload) + if (!parsedMessage.streamId) return + const projectDb = await getProjectDbClient({ + projectId: parsedMessage.streamId + }) + await onFileProcessingFactory({ + getFileInfo: getFileInfoFactory({ db: projectDb }), + emitEvent: getEventBus().emit + })(parsedMessage) + }) } - scheduledTasks.push( - await scheduleFileImportExpiry({ - scheduleExecution, - cronExpression: EveryMinute - }) - ) - - await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => { - const parsedMessage = parseMessagePayload(msg.payload) - if (!parsedMessage.streamId) return - const projectDb = await getProjectDbClient({ - projectId: parsedMessage.streamId - }) - - await onFileImportProcessedFactory({ - getFileInfo: getFileInfoFactory({ db: projectDb }), - getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), - updateFileUpload: updateFileUploadFactory({ db: projectDb }), - eventEmit: getEventBus().emit - })(parsedMessage) - }) - - await listenFor(FileUploadDatabaseEvents.Started, async (msg) => { - const parsedMessage = parseMessagePayload(msg.payload) - if (!parsedMessage.streamId) return - const projectDb = await getProjectDbClient({ - projectId: parsedMessage.streamId - }) - await onFileProcessingFactory({ - getFileInfo: getFileInfoFactory({ db: projectDb }), - emitEvent: getEventBus().emit - })(parsedMessage) - }) - initializeEventListenersFactory({ db, observeResult })() reportSubscriptionEventsFactory({ publish, diff --git a/packages/server/modules/fileuploads/queues/fileimports.ts b/packages/server/modules/fileuploads/queues/fileimports.ts index d0ae00017..ad55190d0 100644 --- a/packages/server/modules/fileuploads/queues/fileimports.ts +++ b/packages/server/modules/fileuploads/queues/fileimports.ts @@ -1,16 +1,12 @@ -import { - getFileImportTimeLimitMinutes, - getServerOrigin -} from '@/modules/shared/helpers/envHelper' +import { getServerOrigin } from '@/modules/shared/helpers/envHelper' import type { Logger } from '@/observability/logging' -import { TIME_MS } from '@speckle/shared' -import type { JobPayload } from '@speckle/shared/workers/fileimport' +import type { JobPayloadV1 } from '@speckle/shared/workers/fileimport' import type { FileImportQueue } from '@/modules/fileuploads/domain/types' import { NumberOfFileImportRetries, - DelayBetweenFileImportRetriesMinutes, BackgroundJobType, - BackgroundJobPayloadVersion + BackgroundJobPayloadVersion, + singleAttemptMaximumProcessingTimeSeconds } from '@/modules/fileuploads/domain/consts' import type { Knex } from 'knex' import { migrateDbToLatest } from '@/db/migrations' @@ -18,16 +14,11 @@ import { createBackgroundJobFactory } from '@/modules/backgroundjobs/services/cr import { getBackgroundJobCountFactory, storeBackgroundJobFactory -} from '@/modules/backgroundjobs/repositories' -import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/repositories/backgroundjobs' +import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types' export const fileImportQueues: FileImportQueue[] = [] -const timeout = - NumberOfFileImportRetries * - (getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes) * - TIME_MS.minute - export const initializePostgresQueue = async ({ label, supportedFileTypes, @@ -41,7 +32,10 @@ export const initializePostgresQueue = async ({ await migrateDbToLatest({ db, region: `Queue DB for ${label}` }) const createBackgroundJob = createBackgroundJobFactory({ - jobConfig: { maxAttempt: 3, timeoutMs: timeout }, + jobConfig: { + maxAttempt: NumberOfFileImportRetries, + remainingComputeBudgetSeconds: 2 * singleAttemptMaximumProcessingTimeSeconds() + }, storeBackgroundJob: storeBackgroundJobFactory({ db, originServerUrl: getServerOrigin() @@ -55,7 +49,7 @@ export const initializePostgresQueue = async ({ (type) => type.toLocaleLowerCase() // Normalize file types to lowercase (this is a safeguard to prevent stupid typos in the future) ), shutdown: async () => {}, - scheduleJob: async (jobData: JobPayload) => { + scheduleJob: async (jobData: JobPayloadV1) => { await createBackgroundJob({ jobPayload: { jobType: BackgroundJobType.FileImport, diff --git a/packages/server/modules/fileuploads/services/createFileImport.ts b/packages/server/modules/fileuploads/services/createFileImport.ts index c26612086..41167ec60 100644 --- a/packages/server/modules/fileuploads/services/createFileImport.ts +++ b/packages/server/modules/fileuploads/services/createFileImport.ts @@ -1,12 +1,11 @@ import type { CreateAndStoreAppToken } from '@/modules/core/domain/tokens/operations' import { DefaultAppIds } from '@/modules/auth/defaultApps' -import { Scopes, TIME, TIME_MS } from '@speckle/shared' +import { Scopes } from '@speckle/shared' import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/graphql' import type { PushJobToFileImporter } from '@/modules/fileuploads/domain/operations' -import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper' import { - DelayBetweenFileImportRetriesMinutes, - NumberOfFileImportRetries + maximumAllowedQueuingProcessingAndRetryTimeMs, + singleAttemptMaximumProcessingTimeSeconds } from '@/modules/fileuploads/domain/consts' export const pushJobToFileImporterFactory = @@ -21,18 +20,14 @@ export const pushJobToFileImporterFactory = userId, fileName, fileType, - blobId, - jobId + blobId }): Promise => { const token = await deps.createAppToken({ appId: DefaultAppIds.Web, name: `fileimport-${projectId}@${modelId}`, userId, scopes: [Scopes.Streams.Write, Scopes.Streams.Read, Scopes.Profile.Read], - lifespan: - NumberOfFileImportRetries * - (getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes + 1) * - TIME_MS.minute, // allowing an extra minute for some buffer + lifespan: maximumAllowedQueuingProcessingAndRetryTimeMs(), limitResources: [ { id: projectId, @@ -42,14 +37,13 @@ export const pushJobToFileImporterFactory = }) await scheduleJob({ - jobId, fileName, token, serverUrl: deps.getServerOrigin(), modelId, fileType, projectId, - timeOutSeconds: getFileImportTimeLimitMinutes() * TIME.minute, + timeOutSeconds: singleAttemptMaximumProcessingTimeSeconds(), blobId }) } diff --git a/packages/server/modules/fileuploads/services/management.ts b/packages/server/modules/fileuploads/services/management.ts index e8c400946..e2918098b 100644 --- a/packages/server/modules/fileuploads/services/management.ts +++ b/packages/server/modules/fileuploads/services/management.ts @@ -58,7 +58,6 @@ export const insertNewUploadAndNotifyFactoryV2 = projectId: file.projectId, modelId: upload.modelId, blobId: file.id, - jobId: file.id, userId: upload.userId }) diff --git a/packages/server/modules/fileuploads/services/resultHandler.ts b/packages/server/modules/fileuploads/services/resultHandler.ts index 8766424d6..622f7f2ac 100644 --- a/packages/server/modules/fileuploads/services/resultHandler.ts +++ b/packages/server/modules/fileuploads/services/resultHandler.ts @@ -16,13 +16,16 @@ import { FileuploadEvents } from '@/modules/fileuploads/domain/events' import { BackgroundJobStatus, type UpdateBackgroundJob -} from '@/modules/backgroundjobs/domain' -import { JobResultStatus } from '@speckle/shared/workers/fileimport' +} from '@/modules/backgroundjobs/domain/types' +import { + type FileImportJobPayloadV1, + JobResultStatus +} from '@speckle/shared/workers/fileimport' type OnFileImportResultDeps = { getFileInfo: GetFileInfoV2 updateFileUpload: UpdateFileUpload - updateBackgroundJob: UpdateBackgroundJob + updateBackgroundJob: UpdateBackgroundJob eventEmit: EventBusEmit logger: Logger FF_NEXT_GEN_FILE_IMPORTER_ENABLED: boolean @@ -32,15 +35,15 @@ export const onFileImportResultFactory = (deps: OnFileImportResultDeps): ProcessFileImportResult => async (params) => { const { logger } = deps - const { jobId, jobResult } = params + const { blobId, jobResult } = params - const fileInfo = await deps.getFileInfo({ fileId: jobId }) + const fileInfo = await deps.getFileInfo({ fileId: blobId }) if (!fileInfo) { - throw new FileImportJobNotFoundError(`File upload with ID ${jobId} not found`) + throw new FileImportJobNotFoundError(`File upload with ID ${blobId} not found`) } const boundLogger = logger.child({ - jobId, + blobId, fileId: fileInfo.id, fileSize: fileInfo.fileSize, fileName: fileInfo.fileName, @@ -85,15 +88,14 @@ export const onFileImportResultFactory = if (deps.FF_NEXT_GEN_FILE_IMPORTER_ENABLED) { try { await deps.updateBackgroundJob({ - jobId, + payloadFilter: { blobId }, status: newStatusForBackgroundJob }) } catch (e) { const err = ensureError(e) logger.error( - { err }, - 'Error updating background job status in database. Job ID: %s', - jobId + { err, blobId }, + 'Error updating background jobs status in database. Blob ID: {blobId}' ) throw err } @@ -102,7 +104,7 @@ export const onFileImportResultFactory = let updatedFile: FileUploadRecord try { updatedFile = await deps.updateFileUpload({ - id: jobId, + id: blobId, upload: { convertedStatus: status, convertedLastUpdate: new Date(), @@ -118,9 +120,8 @@ export const onFileImportResultFactory = } catch (e) { const err = ensureError(e) logger.error( - { err }, - 'Error updating imported file status in database. File ID: %s', - jobId + { err, info: { fileId: blobId } }, + 'Error updating imported file status in database. File ID: {fileId}' ) throw err } @@ -139,7 +140,7 @@ export const onFileImportResultFactory = await deps.eventEmit({ eventName: FileuploadEvents.Finished, payload: { - jobId, + jobId: blobId, jobResult } }) diff --git a/packages/server/modules/fileuploads/services/tasks.ts b/packages/server/modules/fileuploads/services/tasks.ts index adb47e0db..5cf0cc068 100644 --- a/packages/server/modules/fileuploads/services/tasks.ts +++ b/packages/server/modules/fileuploads/services/tasks.ts @@ -4,10 +4,9 @@ import type { GarbageCollectPendingUploadedFiles, NotifyChangeInFileStatus } from '@/modules/fileuploads/domain/operations' -import type { FailQueuedBackgroundJobsWhichExceedMaximumAttempts } from '@/modules/backgroundjobs/domain' +import type { FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget } from '@/modules/backgroundjobs/domain/types' import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport' import { BackgroundJobType } from '@/modules/fileuploads/domain/consts' -import { LogicError } from '@/modules/shared/errors' export const manageFileImportExpiryFactory = (deps: { garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles @@ -29,12 +28,12 @@ export const manageFileImportExpiryFactory = (deps: { } export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: { - failQueuedBackgroundJobsWhichExceedMaximumAttempts: FailQueuedBackgroundJobsWhichExceedMaximumAttempts + failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget: FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget failPendingUploadedFiles: FailPendingUploadedFiles notifyUploadStatus: NotifyChangeInFileStatus }): ((params: { logger: Logger; originServerUrl: string }) => Promise) => { const { - failQueuedBackgroundJobsWhichExceedMaximumAttempts, + failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget, failPendingUploadedFiles, notifyUploadStatus } = deps @@ -42,30 +41,31 @@ export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: { const { logger, originServerUrl } = params const failedBackgroundJobs = - await failQueuedBackgroundJobsWhichExceedMaximumAttempts({ - originServerUrl, - jobType: BackgroundJobType.FileImport - }) + await failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget( + { + originServerUrl, + jobType: BackgroundJobType.FileImport + } + ) + logger.info( - `Found ${failedBackgroundJobs.length} background jobs which have exceeded maximum number of attempts` + { numberOfFailedBackgroundJobs: failedBackgroundJobs.length }, + 'Found {numberOfFailedBackgroundJobs} background jobs which have exceeded maximum number of attempts or exceeded their compute budget' ) if (failedBackgroundJobs.length === 0) { return } - const fileIds = failedBackgroundJobs.map((job) => job.payload.blobId) - if (fileIds.length !== failedBackgroundJobs.length || fileIds.some((id) => !id)) { - throw new LogicError( - 'We do not have a valid file Id for all failed background jobs', - { - info: { - fileIds - } - } - ) + const validFailedBackgroundJobs = failedBackgroundJobs.filter( + (job) => !!job.payload.blobId + ) + if (validFailedBackgroundJobs.length !== failedBackgroundJobs.length) { + logger.warn('Some failed background jobs do not have a valid blob ID') } + const fileIds = validFailedBackgroundJobs.map((job) => job.payload.blobId) + const updatedUploads = await failPendingUploadedFiles({ uploadIds: fileIds }) diff --git a/packages/server/modules/fileuploads/tasks/expireFileImports.ts b/packages/server/modules/fileuploads/tasks/expireFileImports.ts index 1b9443b44..60427a646 100644 --- a/packages/server/modules/fileuploads/tasks/expireFileImports.ts +++ b/packages/server/modules/fileuploads/tasks/expireFileImports.ts @@ -1,15 +1,11 @@ import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' import { expireOldPendingUploadsFactory } from '@/modules/fileuploads/repositories/fileUploads' import { db } from '@/db/knex' -import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper' import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector' import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks' -import { TIME } from '@speckle/shared' -import { - DelayBetweenFileImportRetriesMinutes, - NumberOfFileImportRetries -} from '@/modules/fileuploads/domain/consts' +import { TIME_MS } from '@speckle/shared' +import { maximumAllowedQueuingProcessingAndRetryTimeMs } from '@/modules/fileuploads/domain/consts' import { getEventBus } from '@/modules/shared/services/eventBus' export const scheduleFileImportExpiry = async ({ @@ -44,11 +40,7 @@ export const scheduleFileImportExpiry = async ({ handler({ logger, timeoutThresholdSeconds: - (NumberOfFileImportRetries * - (getFileImportTimeLimitMinutes() + - DelayBetweenFileImportRetriesMinutes) + - 1) * // additional buffer of 1 minute - TIME.minute + maximumAllowedQueuingProcessingAndRetryTimeMs() / TIME_MS.second }) ) ) diff --git a/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts b/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts index 685d542ca..c1c666873 100644 --- a/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts +++ b/packages/server/modules/fileuploads/tasks/garbageCollectBackgroundJobs.ts @@ -5,7 +5,7 @@ import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/ope import { getEventBus } from '@/modules/shared/services/eventBus' import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks' import { failPendingUploadedFilesFactory } from '@/modules/fileuploads/repositories/fileUploads' -import { failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory } from '@/modules/backgroundjobs/repositories' +import { failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory } from '@/modules/backgroundjobs/repositories/backgroundjobs' import type { Knex } from 'knex' import { getServerOrigin } from '@/modules/shared/helpers/envHelper' @@ -25,10 +25,12 @@ export const scheduleBackgroundJobGarbageCollection = async ({ for (const projectDb of [db, ...regionClients]) { perDbTask.push( garbageCollectAttemptedFileImportBackgroundJobsFactory({ - failQueuedBackgroundJobsWhichExceedMaximumAttempts: - failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ - db: queueDb - }), + failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget: + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory( + { + db: queueDb + } + ), failPendingUploadedFiles: failPendingUploadedFilesFactory({ db: projectDb }), notifyUploadStatus: notifyChangeInFileStatus({ eventEmit: getEventBus().emit diff --git a/packages/server/modules/fileuploads/tests/helpers/creation.ts b/packages/server/modules/fileuploads/tests/helpers/creation.ts index ded115fda..671ca61fc 100644 --- a/packages/server/modules/fileuploads/tests/helpers/creation.ts +++ b/packages/server/modules/fileuploads/tests/helpers/creation.ts @@ -37,8 +37,7 @@ export const buildFileUploadMessage = ( fileType: cryptoRandomString({ length: 10 }), fileName: cryptoRandomString({ length: 10 }), blobId: cryptoRandomString({ length: 10 }), - userId: cryptoRandomString({ length: 10 }), - jobId: cryptoRandomString({ length: 10 }) + userId: cryptoRandomString({ length: 10 }) } return assign(defaults, override) diff --git a/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts b/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts index 0504f3f0d..a45864e49 100644 --- a/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts +++ b/packages/server/modules/fileuploads/tests/integration/tasks.spec.ts @@ -2,10 +2,10 @@ import { expect } from 'chai' import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks' import { BackgroundJobs, - failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory, + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory, getBackgroundJobFactory, storeBackgroundJobFactory -} from '@/modules/backgroundjobs/repositories' +} from '@/modules/backgroundjobs/repositories/backgroundjobs' import { db } from '@/db/knex' import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -19,7 +19,7 @@ import { type BackgroundJobPayload, BackgroundJobStatus, type BackgroundJob -} from '@/modules/backgroundjobs/domain' +} from '@/modules/backgroundjobs/domain/types' import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport' import cryptoRandomString from 'crypto-random-string' import type { FileUploadRecordV2 } from '@/modules/fileuploads/helpers/types' @@ -55,7 +55,7 @@ export const createTestJob = ( status: BackgroundJobStatus.Queued, attempt: 0, maxAttempt: 3, - timeoutMs: 30000, + remainingComputeBudgetSeconds: 300, createdAt: new Date(), updatedAt: new Date(), ...overrides @@ -140,8 +140,8 @@ describe('File import garbage collection @fileuploads integration', () => { describe('garbage collect file import background jobs', () => { const SUT = garbageCollectAttemptedFileImportBackgroundJobsFactory({ - failQueuedBackgroundJobsWhichExceedMaximumAttempts: - failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({ + failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget: + failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({ db }), failPendingUploadedFiles: failPendingUploadedFilesFactory({ @@ -173,19 +173,19 @@ describe('File import garbage collection @fileuploads integration', () => { projectId: projectOne.id, modelId: modelOne.id }) - const queuedJobAtMaxAttempts = createTestJob({ + const queuedJobExceedingMaxAttempts = createTestJob({ status: BackgroundJobStatus.Queued, payload: createTestJobPayload({ testData: identifiableData, blobId: fileTwo.fileId }), - attempt: 3, + attempt: 4, maxAttempt: 3 }) await saveUploadFile(fileOne) await storeBackgroundJob({ job: processingJobAtMaxAttempts }) await saveUploadFile(fileTwo) - await storeBackgroundJob({ job: queuedJobAtMaxAttempts }) + await storeBackgroundJob({ job: queuedJobExceedingMaxAttempts }) // ensure jobs are in the database and retrievable const existing = await db(BackgroundJobs.name) @@ -210,7 +210,9 @@ describe('File import garbage collection @fileuploads integration', () => { expect(resultOne?.status).to.equal(BackgroundJobStatus.Processing) // queued job should have been garbage collected - const resultTwo = await getBackgroundJob({ jobId: queuedJobAtMaxAttempts.id }) + const resultTwo = await getBackgroundJob({ + jobId: queuedJobExceedingMaxAttempts.id + }) expect(resultTwo?.status).to.equal(BackgroundJobStatus.Failed) const fileOneResult = await getUploadFile({ fileId: fileOne.fileId }) diff --git a/packages/server/modules/fileuploads/tests/unit/fileuploads.spec.ts b/packages/server/modules/fileuploads/tests/unit/fileuploads.spec.ts index 56f78f599..ba4e14eca 100644 --- a/packages/server/modules/fileuploads/tests/unit/fileuploads.spec.ts +++ b/packages/server/modules/fileuploads/tests/unit/fileuploads.spec.ts @@ -20,7 +20,7 @@ import { pushJobToFileImporterFactory } from '@/modules/fileuploads/services/cre import { assign, get } from 'lodash-es' import { buildFileUploadMessage } from '@/modules/fileuploads/tests/helpers/creation' import { getFeatureFlags } from '@speckle/shared/environment' -import type { JobPayload } from '@speckle/shared/workers/fileimport' +import type { JobPayloadV1 } from '@speckle/shared/workers/fileimport' import type { EventBusEmit } from '@/modules/shared/services/eventBus' import { FileuploadEvents } from '@/modules/fileuploads/domain/events' import type { BranchRecord } from '@/modules/core/helpers/types' @@ -196,8 +196,7 @@ describe('FileUploads @fileuploads', () => { }) expect(usedUserId).to.equal(upload.userId) - const expected: JobPayload = { - jobId: upload.jobId, + const expected: JobPayloadV1 = { fileName: upload.fileName, token, serverUrl: serverOrigin, diff --git a/packages/shared/src/workers/fileimport/job.ts b/packages/shared/src/workers/fileimport/job.ts index 17294ea86..3f88333d8 100644 --- a/packages/shared/src/workers/fileimport/job.ts +++ b/packages/shared/src/workers/fileimport/job.ts @@ -1,26 +1,22 @@ import z from 'zod' import { TIME } from '../../core/index.js' -const job = z.object({ - jobId: z.string() +export const jobPayloadV1 = z.object({ + serverUrl: z.string().url().describe('The url of the server'), + projectId: z.string(), + modelId: z.string(), + token: z.string(), + blobId: z.string(), + fileType: z.string(), + fileName: z.string(), + timeOutSeconds: z + .number() + .int() + .default(30 * TIME.minute) + .describe('The timeout for a single attempt of the job, in seconds.') }) -export const jobPayload = job.merge( - z.object({ - serverUrl: z.string().url().describe('The url of the server'), - projectId: z.string(), - modelId: z.string(), - token: z.string(), - blobId: z.string(), - fileType: z.string(), - fileName: z.string(), - timeOutSeconds: z - .number() - .int() - .default(30 * TIME.minute) - }) -) -export type JobPayload = z.infer +export type JobPayloadV1 = z.infer const baseFileImportResult = z.object({ durationSeconds: z @@ -69,7 +65,7 @@ export const fileImportResultPayload = z.discriminatedUnion('status', [ export type FileImportResultPayload = z.infer -export type FileImportJobPayloadV1 = JobPayload & { +export type FileImportJobPayloadV1 = JobPayloadV1 & { jobType: 'fileImport' payloadVersion: 1 }