feat(server/fileuploads): background jobs draws down a compute budget (#5349)

This commit is contained in:
Iain Sproat
2025-09-08 16:26:41 +01:00
committed by GitHub
parent 67db5af3c5
commit 79ef564e1b
26 changed files with 368 additions and 246 deletions
+2 -1
View File
@@ -29,7 +29,8 @@ repos:
name: ruff checker (python linter)
args:
- --ignore=E501 # ignoring error about lines that are too long
# Run the linter.
# - --fix # uncomment to automatically fix linting errors
# Run the linter.
- id: ruff-format
name: ruff formatter (python formatter)
@@ -47,3 +47,4 @@ class FileimportJob(JobBase):
max_attempt: int
created_at: datetime
updated_at: datetime
remaining_compute_budget_seconds: int
@@ -1,6 +1,7 @@
import asyncio
import tempfile
import time
from math import floor
from pathlib import Path
import structlog
@@ -16,7 +17,12 @@ from specklepy.core.api.inputs.file_import_inputs import (
from specklepy.core.api.models import Version
from ifc_importer.domain import FileimportPayload, JobStatus
from ifc_importer.repository import get_next_job, return_job_to_queued, setup_connection
from ifc_importer.repository import (
deduct_from_compute_budget,
get_next_job,
return_job_to_queued,
setup_connection,
)
IDLE_TIMEOUT = 1
@@ -82,6 +88,11 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
continue
start = time.time()
duration = 0
job_timeout = max(
0, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds)
)
speckle_client = setup_client(job.payload)
job_id = job.id
@@ -98,17 +109,23 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
)
logger = logger.bind(job_id=job_id, project_id=job.payload.project_id)
logger.info("starting job")
logger.info(
"starting job {job_id} for project {project_id}, attempt {attempt} / {max_attempts} with remaining compute budget {remaining_compute_budget_seconds}s and timeout {job_timeout}s",
attempt=attempt,
max_attempts=job.max_attempt,
remaining_compute_budget_seconds=job.remaining_compute_budget_seconds,
job_timeout=job_timeout,
)
handler = job_handler(speckle_client, job.payload, logger)
# this will raise a TimeoutError if handler does not complete in time
version, download_duration, parse_duration = await asyncio.wait_for(
handler, timeout=job.payload.time_out_seconds
handler, timeout=job_timeout
)
version_id = version.id
duration = time.time() - start
logger.info(
"Finished job after {duration} created version {version_id}",
"Finished parsing job after {duration}s, creating version {version_id}",
duration=duration,
version_id=version_id,
)
@@ -116,7 +133,7 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
_ = speckle_client.file_import.finish_file_import_job(
FileImportSuccessInput(
project_id=job.payload.project_id,
# for some reason, the blob id identifies the job here
# the blob id identifies the "job" here
job_id=job.payload.blob_id,
result=FileImportResult(
parser=parser,
@@ -131,32 +148,28 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
# mark it as succeeded so we do not enter any error handling routines on finalisation
job_status = JobStatus.SUCCEEDED
except TimeoutError as te:
# if it times out we allow re-queueing until it reaches max tries
ex = te
if attempt >= job.max_attempt:
job_status = JobStatus.FAILED
else:
job_status = JobStatus.QUEUED
# raised if the task is canceled
except Exception as e:
#
ex = e
job_status = JobStatus.FAILED
finally:
if job_status == JobStatus.QUEUED:
await return_job_to_queued(connection, job_id)
elif job_status == JobStatus.FAILED:
if duration <= 0:
# it probably failed before we calculated the duration, so calculate it now
duration = time.time() - start
await deduct_from_compute_budget(
connection, logger, job_id, floor(duration)
)
if job_status == JobStatus.FAILED:
# we should be reporting the failure to the server
logger.error("job processing failed", exc_info=ex)
try:
_ = speckle_client.file_import.finish_file_import_job(
FileImportErrorInput(
project_id=job.payload.project_id,
# for some reason, the blob id identifies the job here
# the blob id identifies the job to the server
job_id=job.payload.blob_id,
# job_id=job_id,
reason=str(ex),
result=FileImportResult(
parser=parser,
@@ -170,9 +183,10 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
# the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further
except Exception as ex:
logger.error("failed to report job failure", exc_info=ex)
await return_job_to_queued(connection, job_id)
# The client will not pick up a queued job if it now has exceeded max attempts.
# The server is responsible for moving queued jobs which have exceeded maximum attempts to failed status.
# somehow we're in a weird state, let's return the job to the queued state
# where it will get picked up again until one of total timeout, max attempts, or exhausted compute budget is reached
# The server is responsible for garbage collecting jobs which have reached these error conditions and moving them to a failed status.
await return_job_to_queued(connection, logger, job_id)
elif job_status == JobStatus.SUCCEEDED:
# do nothing
# we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server)
@@ -1,5 +1,6 @@
import json
import structlog
from asyncpg import Connection, connect
from ifc_importer.config import settings
@@ -30,15 +31,17 @@ async def get_next_job(connection: Connection) -> FileimportJob | None:
"updatedAt" = NOW()
WHERE id = (
SELECT id FROM background_jobs
WHERE ( --queued job
WHERE ( -- job in a QUEUED state which has not yet exceeded maximum attempts and has a positive remaining compute budget
payload ->> 'fileType' = 'ifc'
AND status = $2
AND "attempt" < "maxAttempt"
AND "remainingComputeBudgetSeconds"::int > 0
)
OR ( --timed job left on processing state
OR ( -- any job left in a PROCESSING state for more than its timeout period
payload ->> 'fileType' = 'ifc'
AND status = $1
AND "updatedAt" < NOW() - ("timeoutMs" * interval '1 millisecond')
AND "attempt" <= "maxAttempt"
AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int * interval '1 second'
)
ORDER BY "createdAt"
FOR UPDATE SKIP LOCKED
@@ -56,21 +59,54 @@ async def get_next_job(connection: Connection) -> FileimportJob | None:
return FileimportJob.model_validate(dict(job))
async def return_job_to_queued(connection: Connection, job_id: str) -> None:
print(f"returning job: {job_id} to queued")
return await set_job_status(connection, job_id, JobStatus.QUEUED)
async def return_job_to_queued(
connection: Connection, logger: structlog.stdlib.BoundLogger, job_id: str
) -> None:
logger.info("returning job: {job_id} to queued", job_id=job_id)
return await set_job_status(connection, logger, job_id, JobStatus.QUEUED)
async def set_job_status(
connection: Connection, job_id: str, job_status: JobStatus
connection: Connection,
logger: structlog.stdlib.BoundLogger,
job_id: str,
job_status: JobStatus,
) -> None:
print(f"updating job: {job_id}'s status to {job_status}")
logger.info(
"updating job: {job_id}'s status to {job_status}",
job_id=job_id,
job_status=job_status.value,
)
_ = await connection.execute(
"""
UPDATE background_jobs
SET status = $1, "updatedAt" = NOW()
SET status = $1,
"updatedAt" = NOW()
WHERE id = $2
""",
job_status.value,
job_id,
)
async def deduct_from_compute_budget(
connection: Connection,
logger: structlog.stdlib.BoundLogger,
job_id: str,
used_compute_time_seconds: int,
) -> None:
logger.info(
"updating job: {job_id}'s remaining compute budget by deducting {used_compute_time_seconds} seconds",
job_id=job_id,
used_compute_time_seconds=used_compute_time_seconds,
)
_ = await connection.execute(
"""
UPDATE background_jobs
SET "remainingComputeBudgetSeconds" = "remainingComputeBudgetSeconds"::int - $1,
"updatedAt" = NOW()
WHERE id = $2
""",
used_compute_time_seconds,
job_id,
)
@@ -154,6 +154,10 @@ enum JobResultStatus {
input FinishFileImportInput {
projectId: String!
"""
This is the blob Id of the uploaded file. For legacy reasons it is named jobId.
Note: This is the not the background job Id.
"""
jobId: String!
status: JobResultStatus!
reason: String
@@ -19,7 +19,7 @@ export type BackgroundJobPayload = z.infer<typeof BackgroundJobPayload>
export type BackgroundJobConfig = {
maxAttempt: number
timeoutMs: number
remainingComputeBudgetSeconds: number
}
export type BackgroundJob<T extends BackgroundJobPayload> = BackgroundJobConfig & {
@@ -39,15 +39,15 @@ export type StoreBackgroundJob = (args: {
export type GetBackgroundJob<T extends BackgroundJobPayload = BackgroundJobPayload> =
(args: { jobId: string }) => Promise<BackgroundJob<T> | null>
export type FailQueuedBackgroundJobsWhichExceedMaximumAttempts<
export type FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget<
T extends BackgroundJobPayload = BackgroundJobPayload
> = (args: { originServerUrl: string; jobType: string }) => Promise<BackgroundJob<T>[]>
export type UpdateBackgroundJob<T extends BackgroundJobPayload = BackgroundJobPayload> =
(args: {
jobId: string
payloadFilter: Partial<T>
status: BackgroundJobStatus
}) => Promise<BackgroundJob<T> | null>
}) => Promise<BackgroundJob<T>[]>
export type GetBackgroundJobCount<
T extends BackgroundJobPayload = BackgroundJobPayload
@@ -0,0 +1,18 @@
import { TIME, TIME_MS } from '@speckle/shared'
import { type Knex } from 'knex'
const JOB_TABLE_NAME = 'background_jobs'
export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable(JOB_TABLE_NAME, (table) => {
table.integer('remainingComputeBudgetSeconds').defaultTo(TIME.hour).notNullable()
table.dropColumn('timeoutMs')
})
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable(JOB_TABLE_NAME, (table) => {
table.dropColumn('remainingComputeBudgetSeconds')
table.integer('timeoutMs').defaultTo(TIME_MS.day).notNullable()
})
}
@@ -1,8 +1,8 @@
import type { Knex } from 'knex'
import type {
FailQueuedBackgroundJobsWhichExceedMaximumAttempts,
FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget,
UpdateBackgroundJob
} from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import {
type BackgroundJob,
type BackgroundJobPayload,
@@ -10,7 +10,7 @@ import {
type GetBackgroundJobCount,
type StoreBackgroundJob,
BackgroundJobStatus
} from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import { buildTableHelper } from '@/modules/core/dbSchema'
export const BackgroundJobs = buildTableHelper('background_jobs', [
@@ -19,11 +19,11 @@ export const BackgroundJobs = buildTableHelper('background_jobs', [
'payload',
'status',
'originServerUrl',
'timeoutMs',
'attempt',
'maxAttempt',
'createdAt',
'updatedAt'
'updatedAt',
'remainingComputeBudgetSeconds'
])
type StoredBackgroundJob = BackgroundJob<BackgroundJobPayload> & {
@@ -53,46 +53,55 @@ export const getBackgroundJobFactory =
return job ?? null
}
export const failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory =
export const failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory =
<T extends BackgroundJobPayload = BackgroundJobPayload>({
db
}: {
db: Knex
}): FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget<T> =>
async ({ jobType, originServerUrl }) => {
const query = tables
.backgroundJobs(db)
.where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl)
.andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType)
.whereIn(BackgroundJobs.withoutTablePrefix.col.status, [
BackgroundJobStatus.Queued,
BackgroundJobStatus.Processing
])
.andWhere(function () {
this.where(
BackgroundJobs.withoutTablePrefix.col.attempt,
'>', // greater than because processing jobs may currently equal maxAttempt and still be running
db.raw('"maxAttempt"') // camelCase requires the column name to be wrapped in double quotes
).orWhere(
BackgroundJobs.withoutTablePrefix.col.remainingComputeBudgetSeconds,
'<=',
0
)
})
.update({
[BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed
})
.orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc')
.returning<BackgroundJob<T>[]>('*')
return await query
}
export const updateBackgroundJobFactory =
<T extends BackgroundJobPayload = BackgroundJobPayload>({
db
}: {
db: Knex
}): FailQueuedBackgroundJobsWhichExceedMaximumAttempts<T> =>
async ({ jobType, originServerUrl }) => {
const query = tables
.backgroundJobs(db)
.where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl)
.andWhere(
BackgroundJobs.withoutTablePrefix.col.status,
BackgroundJobStatus.Queued
)
.andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType)
.andWhere(
BackgroundJobs.withoutTablePrefix.col.attempt,
'>=',
db.raw('"maxAttempt"') // camel-case requires the column name to be wrapped in double quotes
)
.orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc')
.update({
[BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed
})
.returning<BackgroundJob<T>[]>('*')
return await query
}
export const updateBackgroundJobFactory =
({ db }: { db: Knex }): UpdateBackgroundJob =>
async ({ jobId, status }) => {
}): UpdateBackgroundJob<T> =>
async ({ payloadFilter, status }) => {
const query = tables
.backgroundJobs(db)
.update({ status })
.where({ id: jobId })
.returning('*')
const rows = await query
if (rows.length === 0) return null
return rows[0]
.whereJsonSupersetOf('payload', payloadFilter)
.returning<BackgroundJob<T>[]>('*')
return await query
}
export const getBackgroundJobCountFactory =
@@ -3,8 +3,8 @@ import type {
BackgroundJobConfig,
BackgroundJobPayload,
StoreBackgroundJob
} from '@/modules/backgroundjobs/domain'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types'
import cryptoRandomString from 'crypto-random-string'
export const createBackgroundJobFactory = <T extends BackgroundJobPayload>({
@@ -4,13 +4,13 @@ import {
getBackgroundJobFactory,
BackgroundJobs,
getBackgroundJobCountFactory,
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory
} from '@/modules/backgroundjobs/repositories'
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory
} from '@/modules/backgroundjobs/repositories/backgroundjobs'
import type {
BackgroundJob,
BackgroundJobPayload
} from '@/modules/backgroundjobs/domain'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types'
import { expect } from 'chai'
import { createRandomString } from '@/modules/core/helpers/testHelpers'
@@ -35,9 +35,9 @@ const createTestJob = (
status: BackgroundJobStatus.Queued,
attempt: 0,
maxAttempt: 3,
timeoutMs: 30000,
createdAt: new Date(),
updatedAt: new Date(),
remainingComputeBudgetSeconds: 30,
...overrides
})
@@ -173,18 +173,62 @@ describe('Background Jobs repositories @backgroundjobs', () => {
})
})
describe('failQueuedBackgroundJobsWhichExceedMaximumAttempts', () => {
describe('failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory', () => {
it('should fail queued background jobs that exceed maximum attempts', async () => {
const job = createTestJob({
status: BackgroundJobStatus.Queued,
attempt: 2,
attempt: 3,
maxAttempt: 2
})
await storeBackgroundJob({ job })
const SUT = failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
db
const SUT =
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
db
})
await SUT({ originServerUrl, jobType: 'fileImport' })
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
})
it('should fail queued background jobs with zero compute budget', async () => {
const job = createTestJob({
payload: {
jobType: 'fileImport',
payloadVersion: 1,
testData: 'complex-test-data'
},
remainingComputeBudgetSeconds: 0
})
await storeBackgroundJob({ job })
const SUT =
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
db
})
await SUT({ originServerUrl, jobType: 'fileImport' })
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
})
it('should fail queued background jobs with negative compute budget', async () => {
const job = createTestJob({
payload: {
jobType: 'fileImport',
payloadVersion: 1,
testData: 'complex-test-data'
},
remainingComputeBudgetSeconds: -100
})
await storeBackgroundJob({ job })
const SUT =
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
db
})
await SUT({ originServerUrl, jobType: 'fileImport' })
@@ -5,13 +5,13 @@ import type {
BackgroundJobConfig,
BackgroundJobPayload,
StoreBackgroundJob
} from '@/modules/backgroundjobs/domain'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types'
describe('scheduleBackgroundJobFactory', () => {
const mockJobConfig: BackgroundJobConfig = {
maxAttempt: 3,
timeoutMs: 30000
remainingComputeBudgetSeconds: 30
}
interface TestJobPayload extends BackgroundJobPayload {
@@ -96,7 +96,9 @@ describe('scheduleBackgroundJobFactory', () => {
const result = await createBackgroundJob({ jobPayload: mockJobPayload })
expect(result.maxAttempt).to.equal(mockJobConfig.maxAttempt)
expect(result.timeoutMs).to.equal(mockJobConfig.timeoutMs)
expect(result.remainingComputeBudgetSeconds).to.equal(
mockJobConfig.remainingComputeBudgetSeconds
)
})
it('should preserve job payload', async () => {
@@ -1,10 +1,13 @@
import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper'
import { TIME, TIME_MS } from '@speckle/shared'
export const FileUploadDatabaseEvents = {
Updated: 'file_import_update',
Started: 'file_import_started'
} as const
export const DelayBetweenFileImportRetriesMinutes = 5
export const NumberOfFileImportRetries = 5
export const NumberOfFileImportRetries = 3
export const BackgroundJobType = {
FileImport: 'fileImport'
} as const
@@ -18,3 +21,11 @@ export const BackgroundJobPayloadVersion = {
export type BackgroundJobPayloadVersion =
(typeof BackgroundJobPayloadVersion)[keyof typeof BackgroundJobPayloadVersion]
export const maximumAllowedQueuingProcessingAndRetryTimeMs = () => 1 * TIME_MS.day
// NumberOfFileImportRetries *
// (getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes + 1) *
// TIME_MS.minute // allowing an extra minute for some buffer
export const singleAttemptMaximumProcessingTimeSeconds = () =>
getFileImportTimeLimitMinutes() * TIME.minute
@@ -7,7 +7,7 @@ import type { Optional } from '@speckle/shared'
import type { UploadResult } from '@/modules/blobstorage/domain/types'
import type {
FileImportResultPayload,
JobPayload
JobPayloadV1
} from '@speckle/shared/workers/fileimport'
export type GetFileInfo = (args: {
@@ -67,7 +67,7 @@ export type NotifyChangeInFileStatus = (params: {
}) => Promise<void>
export type ProcessFileImportResult = (params: {
jobId: string
blobId: string
jobResult: FileImportResultPayload
}) => Promise<void>
@@ -82,11 +82,11 @@ export type UpdateFileStatus = (params: {
export type UploadedFile = UploadResult & { userId: string }
export type FileImportMessage = Pick<
JobPayload,
JobPayloadV1,
'modelId' | 'projectId' | 'fileType' | 'fileName' | 'blobId'
> & { jobId: string; userId: string }
> & { userId: string }
export type ScheduleFileimportJob = (args: JobPayload) => Promise<void>
export type ScheduleFileimportJob = (args: JobPayloadV1) => Promise<void>
export type PushJobToFileImporter = (
args: { scheduleJob: ScheduleFileimportJob } & FileImportMessage
@@ -76,7 +76,7 @@ import { onFileImportResultFactory } from '@/modules/fileuploads/services/result
import type { FileImportResultPayload } from '@speckle/shared/workers/fileimport'
import { JobResultStatus } from '@speckle/shared/workers/fileimport'
import type { GraphQLContext } from '@/modules/shared/helpers/typeHelper'
import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories'
import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories/backgroundjobs'
import { configureClient } from '@/knexfile'
const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED } = getFeatureFlags()
@@ -250,6 +250,8 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = {
if (!FF_NEXT_GEN_FILE_IMPORTER_ENABLED)
throw new MisconfiguredEnvironmentError('File import next gen is not enabled')
// NOTE: jobId in this context is actually the blobId of the uploaded file
// We keep the naming for backwards compatibility reasons
const { projectId, jobId, status, warnings, reason, result } = args.input
const userId = ctx.userId
if (!userId) {
@@ -294,7 +296,7 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = {
projectId,
streamId: projectId, //legacy
userId,
jobId
blobId: jobId
})
const projectDb = await getProjectDbClient({ projectId })
@@ -310,7 +312,7 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = {
})
await onFileImportResult({
jobId,
blobId: jobId,
jobResult
})
+36 -34
View File
@@ -49,6 +49,7 @@ const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED, FF_RHINO_FILE_IMPORTER_ENABLED } =
getFeatureFlags()
const EveryMinute = '*/1 * * * *'
const EveryFiveMinutes = '*/5 * * * *'
const scheduledTasks: cron.ScheduledTask[] = []
@@ -110,46 +111,47 @@ export const init: SpeckleModule['init'] = async ({
scheduledTasks.push(
await scheduleBackgroundJobGarbageCollection({
queueDb,
scheduleExecution,
cronExpression: EveryFiveMinutes
})
)
} else {
// feature flag is not enabled
scheduledTasks.push(
await scheduleFileImportExpiry({
scheduleExecution,
cronExpression: EveryMinute
})
)
await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({
projectId: parsedMessage.streamId
})
await onFileImportProcessedFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
updateFileUpload: updateFileUploadFactory({ db: projectDb }),
eventEmit: getEventBus().emit
})(parsedMessage)
})
await listenFor(FileUploadDatabaseEvents.Started, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({
projectId: parsedMessage.streamId
})
await onFileProcessingFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
emitEvent: getEventBus().emit
})(parsedMessage)
})
}
scheduledTasks.push(
await scheduleFileImportExpiry({
scheduleExecution,
cronExpression: EveryMinute
})
)
await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({
projectId: parsedMessage.streamId
})
await onFileImportProcessedFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
updateFileUpload: updateFileUploadFactory({ db: projectDb }),
eventEmit: getEventBus().emit
})(parsedMessage)
})
await listenFor(FileUploadDatabaseEvents.Started, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
if (!parsedMessage.streamId) return
const projectDb = await getProjectDbClient({
projectId: parsedMessage.streamId
})
await onFileProcessingFactory({
getFileInfo: getFileInfoFactory({ db: projectDb }),
emitEvent: getEventBus().emit
})(parsedMessage)
})
initializeEventListenersFactory({ db, observeResult })()
reportSubscriptionEventsFactory({
publish,
@@ -1,16 +1,12 @@
import {
getFileImportTimeLimitMinutes,
getServerOrigin
} from '@/modules/shared/helpers/envHelper'
import { getServerOrigin } from '@/modules/shared/helpers/envHelper'
import type { Logger } from '@/observability/logging'
import { TIME_MS } from '@speckle/shared'
import type { JobPayload } from '@speckle/shared/workers/fileimport'
import type { JobPayloadV1 } from '@speckle/shared/workers/fileimport'
import type { FileImportQueue } from '@/modules/fileuploads/domain/types'
import {
NumberOfFileImportRetries,
DelayBetweenFileImportRetriesMinutes,
BackgroundJobType,
BackgroundJobPayloadVersion
BackgroundJobPayloadVersion,
singleAttemptMaximumProcessingTimeSeconds
} from '@/modules/fileuploads/domain/consts'
import type { Knex } from 'knex'
import { migrateDbToLatest } from '@/db/migrations'
@@ -18,16 +14,11 @@ import { createBackgroundJobFactory } from '@/modules/backgroundjobs/services/cr
import {
getBackgroundJobCountFactory,
storeBackgroundJobFactory
} from '@/modules/backgroundjobs/repositories'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/repositories/backgroundjobs'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain/types'
export const fileImportQueues: FileImportQueue[] = []
const timeout =
NumberOfFileImportRetries *
(getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes) *
TIME_MS.minute
export const initializePostgresQueue = async ({
label,
supportedFileTypes,
@@ -41,7 +32,10 @@ export const initializePostgresQueue = async ({
await migrateDbToLatest({ db, region: `Queue DB for ${label}` })
const createBackgroundJob = createBackgroundJobFactory({
jobConfig: { maxAttempt: 3, timeoutMs: timeout },
jobConfig: {
maxAttempt: NumberOfFileImportRetries,
remainingComputeBudgetSeconds: 2 * singleAttemptMaximumProcessingTimeSeconds()
},
storeBackgroundJob: storeBackgroundJobFactory({
db,
originServerUrl: getServerOrigin()
@@ -55,7 +49,7 @@ export const initializePostgresQueue = async ({
(type) => type.toLocaleLowerCase() // Normalize file types to lowercase (this is a safeguard to prevent stupid typos in the future)
),
shutdown: async () => {},
scheduleJob: async (jobData: JobPayload) => {
scheduleJob: async (jobData: JobPayloadV1) => {
await createBackgroundJob({
jobPayload: {
jobType: BackgroundJobType.FileImport,
@@ -1,12 +1,11 @@
import type { CreateAndStoreAppToken } from '@/modules/core/domain/tokens/operations'
import { DefaultAppIds } from '@/modules/auth/defaultApps'
import { Scopes, TIME, TIME_MS } from '@speckle/shared'
import { Scopes } from '@speckle/shared'
import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/graphql'
import type { PushJobToFileImporter } from '@/modules/fileuploads/domain/operations'
import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper'
import {
DelayBetweenFileImportRetriesMinutes,
NumberOfFileImportRetries
maximumAllowedQueuingProcessingAndRetryTimeMs,
singleAttemptMaximumProcessingTimeSeconds
} from '@/modules/fileuploads/domain/consts'
export const pushJobToFileImporterFactory =
@@ -21,18 +20,14 @@ export const pushJobToFileImporterFactory =
userId,
fileName,
fileType,
blobId,
jobId
blobId
}): Promise<void> => {
const token = await deps.createAppToken({
appId: DefaultAppIds.Web,
name: `fileimport-${projectId}@${modelId}`,
userId,
scopes: [Scopes.Streams.Write, Scopes.Streams.Read, Scopes.Profile.Read],
lifespan:
NumberOfFileImportRetries *
(getFileImportTimeLimitMinutes() + DelayBetweenFileImportRetriesMinutes + 1) *
TIME_MS.minute, // allowing an extra minute for some buffer
lifespan: maximumAllowedQueuingProcessingAndRetryTimeMs(),
limitResources: [
{
id: projectId,
@@ -42,14 +37,13 @@ export const pushJobToFileImporterFactory =
})
await scheduleJob({
jobId,
fileName,
token,
serverUrl: deps.getServerOrigin(),
modelId,
fileType,
projectId,
timeOutSeconds: getFileImportTimeLimitMinutes() * TIME.minute,
timeOutSeconds: singleAttemptMaximumProcessingTimeSeconds(),
blobId
})
}
@@ -58,7 +58,6 @@ export const insertNewUploadAndNotifyFactoryV2 =
projectId: file.projectId,
modelId: upload.modelId,
blobId: file.id,
jobId: file.id,
userId: upload.userId
})
@@ -16,13 +16,16 @@ import { FileuploadEvents } from '@/modules/fileuploads/domain/events'
import {
BackgroundJobStatus,
type UpdateBackgroundJob
} from '@/modules/backgroundjobs/domain'
import { JobResultStatus } from '@speckle/shared/workers/fileimport'
} from '@/modules/backgroundjobs/domain/types'
import {
type FileImportJobPayloadV1,
JobResultStatus
} from '@speckle/shared/workers/fileimport'
type OnFileImportResultDeps = {
getFileInfo: GetFileInfoV2
updateFileUpload: UpdateFileUpload
updateBackgroundJob: UpdateBackgroundJob
updateBackgroundJob: UpdateBackgroundJob<FileImportJobPayloadV1>
eventEmit: EventBusEmit
logger: Logger
FF_NEXT_GEN_FILE_IMPORTER_ENABLED: boolean
@@ -32,15 +35,15 @@ export const onFileImportResultFactory =
(deps: OnFileImportResultDeps): ProcessFileImportResult =>
async (params) => {
const { logger } = deps
const { jobId, jobResult } = params
const { blobId, jobResult } = params
const fileInfo = await deps.getFileInfo({ fileId: jobId })
const fileInfo = await deps.getFileInfo({ fileId: blobId })
if (!fileInfo) {
throw new FileImportJobNotFoundError(`File upload with ID ${jobId} not found`)
throw new FileImportJobNotFoundError(`File upload with ID ${blobId} not found`)
}
const boundLogger = logger.child({
jobId,
blobId,
fileId: fileInfo.id,
fileSize: fileInfo.fileSize,
fileName: fileInfo.fileName,
@@ -85,15 +88,14 @@ export const onFileImportResultFactory =
if (deps.FF_NEXT_GEN_FILE_IMPORTER_ENABLED) {
try {
await deps.updateBackgroundJob({
jobId,
payloadFilter: { blobId },
status: newStatusForBackgroundJob
})
} catch (e) {
const err = ensureError(e)
logger.error(
{ err },
'Error updating background job status in database. Job ID: %s',
jobId
{ err, blobId },
'Error updating background jobs status in database. Blob ID: {blobId}'
)
throw err
}
@@ -102,7 +104,7 @@ export const onFileImportResultFactory =
let updatedFile: FileUploadRecord
try {
updatedFile = await deps.updateFileUpload({
id: jobId,
id: blobId,
upload: {
convertedStatus: status,
convertedLastUpdate: new Date(),
@@ -118,9 +120,8 @@ export const onFileImportResultFactory =
} catch (e) {
const err = ensureError(e)
logger.error(
{ err },
'Error updating imported file status in database. File ID: %s',
jobId
{ err, info: { fileId: blobId } },
'Error updating imported file status in database. File ID: {fileId}'
)
throw err
}
@@ -139,7 +140,7 @@ export const onFileImportResultFactory =
await deps.eventEmit({
eventName: FileuploadEvents.Finished,
payload: {
jobId,
jobId: blobId,
jobResult
}
})
@@ -4,10 +4,9 @@ import type {
GarbageCollectPendingUploadedFiles,
NotifyChangeInFileStatus
} from '@/modules/fileuploads/domain/operations'
import type { FailQueuedBackgroundJobsWhichExceedMaximumAttempts } from '@/modules/backgroundjobs/domain'
import type { FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget } from '@/modules/backgroundjobs/domain/types'
import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport'
import { BackgroundJobType } from '@/modules/fileuploads/domain/consts'
import { LogicError } from '@/modules/shared/errors'
export const manageFileImportExpiryFactory = (deps: {
garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles
@@ -29,12 +28,12 @@ export const manageFileImportExpiryFactory = (deps: {
}
export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: {
failQueuedBackgroundJobsWhichExceedMaximumAttempts: FailQueuedBackgroundJobsWhichExceedMaximumAttempts<FileImportJobPayloadV1>
failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget: FailQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget<FileImportJobPayloadV1>
failPendingUploadedFiles: FailPendingUploadedFiles
notifyUploadStatus: NotifyChangeInFileStatus
}): ((params: { logger: Logger; originServerUrl: string }) => Promise<void>) => {
const {
failQueuedBackgroundJobsWhichExceedMaximumAttempts,
failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget,
failPendingUploadedFiles,
notifyUploadStatus
} = deps
@@ -42,30 +41,31 @@ export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: {
const { logger, originServerUrl } = params
const failedBackgroundJobs =
await failQueuedBackgroundJobsWhichExceedMaximumAttempts({
originServerUrl,
jobType: BackgroundJobType.FileImport
})
await failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget(
{
originServerUrl,
jobType: BackgroundJobType.FileImport
}
)
logger.info(
`Found ${failedBackgroundJobs.length} background jobs which have exceeded maximum number of attempts`
{ numberOfFailedBackgroundJobs: failedBackgroundJobs.length },
'Found {numberOfFailedBackgroundJobs} background jobs which have exceeded maximum number of attempts or exceeded their compute budget'
)
if (failedBackgroundJobs.length === 0) {
return
}
const fileIds = failedBackgroundJobs.map((job) => job.payload.blobId)
if (fileIds.length !== failedBackgroundJobs.length || fileIds.some((id) => !id)) {
throw new LogicError(
'We do not have a valid file Id for all failed background jobs',
{
info: {
fileIds
}
}
)
const validFailedBackgroundJobs = failedBackgroundJobs.filter(
(job) => !!job.payload.blobId
)
if (validFailedBackgroundJobs.length !== failedBackgroundJobs.length) {
logger.warn('Some failed background jobs do not have a valid blob ID')
}
const fileIds = validFailedBackgroundJobs.map((job) => job.payload.blobId)
const updatedUploads = await failPendingUploadedFiles({
uploadIds: fileIds
})
@@ -1,15 +1,11 @@
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { expireOldPendingUploadsFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper'
import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks'
import { TIME } from '@speckle/shared'
import {
DelayBetweenFileImportRetriesMinutes,
NumberOfFileImportRetries
} from '@/modules/fileuploads/domain/consts'
import { TIME_MS } from '@speckle/shared'
import { maximumAllowedQueuingProcessingAndRetryTimeMs } from '@/modules/fileuploads/domain/consts'
import { getEventBus } from '@/modules/shared/services/eventBus'
export const scheduleFileImportExpiry = async ({
@@ -44,11 +40,7 @@ export const scheduleFileImportExpiry = async ({
handler({
logger,
timeoutThresholdSeconds:
(NumberOfFileImportRetries *
(getFileImportTimeLimitMinutes() +
DelayBetweenFileImportRetriesMinutes) +
1) * // additional buffer of 1 minute
TIME.minute
maximumAllowedQueuingProcessingAndRetryTimeMs() / TIME_MS.second
})
)
)
@@ -5,7 +5,7 @@ import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/ope
import { getEventBus } from '@/modules/shared/services/eventBus'
import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks'
import { failPendingUploadedFilesFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory } from '@/modules/backgroundjobs/repositories'
import { failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory } from '@/modules/backgroundjobs/repositories/backgroundjobs'
import type { Knex } from 'knex'
import { getServerOrigin } from '@/modules/shared/helpers/envHelper'
@@ -25,10 +25,12 @@ export const scheduleBackgroundJobGarbageCollection = async ({
for (const projectDb of [db, ...regionClients]) {
perDbTask.push(
garbageCollectAttemptedFileImportBackgroundJobsFactory({
failQueuedBackgroundJobsWhichExceedMaximumAttempts:
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
db: queueDb
}),
failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget:
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory(
{
db: queueDb
}
),
failPendingUploadedFiles: failPendingUploadedFilesFactory({ db: projectDb }),
notifyUploadStatus: notifyChangeInFileStatus({
eventEmit: getEventBus().emit
@@ -37,8 +37,7 @@ export const buildFileUploadMessage = (
fileType: cryptoRandomString({ length: 10 }),
fileName: cryptoRandomString({ length: 10 }),
blobId: cryptoRandomString({ length: 10 }),
userId: cryptoRandomString({ length: 10 }),
jobId: cryptoRandomString({ length: 10 })
userId: cryptoRandomString({ length: 10 })
}
return assign(defaults, override)
@@ -2,10 +2,10 @@ import { expect } from 'chai'
import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks'
import {
BackgroundJobs,
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory,
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory,
getBackgroundJobFactory,
storeBackgroundJobFactory
} from '@/modules/backgroundjobs/repositories'
} from '@/modules/backgroundjobs/repositories/backgroundjobs'
import { db } from '@/db/knex'
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { getEventBus } from '@/modules/shared/services/eventBus'
@@ -19,7 +19,7 @@ import {
type BackgroundJobPayload,
BackgroundJobStatus,
type BackgroundJob
} from '@/modules/backgroundjobs/domain'
} from '@/modules/backgroundjobs/domain/types'
import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport'
import cryptoRandomString from 'crypto-random-string'
import type { FileUploadRecordV2 } from '@/modules/fileuploads/helpers/types'
@@ -55,7 +55,7 @@ export const createTestJob = (
status: BackgroundJobStatus.Queued,
attempt: 0,
maxAttempt: 3,
timeoutMs: 30000,
remainingComputeBudgetSeconds: 300,
createdAt: new Date(),
updatedAt: new Date(),
...overrides
@@ -140,8 +140,8 @@ describe('File import garbage collection @fileuploads integration', () => {
describe('garbage collect file import background jobs', () => {
const SUT = garbageCollectAttemptedFileImportBackgroundJobsFactory({
failQueuedBackgroundJobsWhichExceedMaximumAttempts:
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudget:
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
db
}),
failPendingUploadedFiles: failPendingUploadedFilesFactory({
@@ -173,19 +173,19 @@ describe('File import garbage collection @fileuploads integration', () => {
projectId: projectOne.id,
modelId: modelOne.id
})
const queuedJobAtMaxAttempts = createTestJob({
const queuedJobExceedingMaxAttempts = createTestJob({
status: BackgroundJobStatus.Queued,
payload: createTestJobPayload({
testData: identifiableData,
blobId: fileTwo.fileId
}),
attempt: 3,
attempt: 4,
maxAttempt: 3
})
await saveUploadFile(fileOne)
await storeBackgroundJob({ job: processingJobAtMaxAttempts })
await saveUploadFile(fileTwo)
await storeBackgroundJob({ job: queuedJobAtMaxAttempts })
await storeBackgroundJob({ job: queuedJobExceedingMaxAttempts })
// ensure jobs are in the database and retrievable
const existing = await db(BackgroundJobs.name)
@@ -210,7 +210,9 @@ describe('File import garbage collection @fileuploads integration', () => {
expect(resultOne?.status).to.equal(BackgroundJobStatus.Processing)
// queued job should have been garbage collected
const resultTwo = await getBackgroundJob({ jobId: queuedJobAtMaxAttempts.id })
const resultTwo = await getBackgroundJob({
jobId: queuedJobExceedingMaxAttempts.id
})
expect(resultTwo?.status).to.equal(BackgroundJobStatus.Failed)
const fileOneResult = await getUploadFile({ fileId: fileOne.fileId })
@@ -20,7 +20,7 @@ import { pushJobToFileImporterFactory } from '@/modules/fileuploads/services/cre
import { assign, get } from 'lodash-es'
import { buildFileUploadMessage } from '@/modules/fileuploads/tests/helpers/creation'
import { getFeatureFlags } from '@speckle/shared/environment'
import type { JobPayload } from '@speckle/shared/workers/fileimport'
import type { JobPayloadV1 } from '@speckle/shared/workers/fileimport'
import type { EventBusEmit } from '@/modules/shared/services/eventBus'
import { FileuploadEvents } from '@/modules/fileuploads/domain/events'
import type { BranchRecord } from '@/modules/core/helpers/types'
@@ -196,8 +196,7 @@ describe('FileUploads @fileuploads', () => {
})
expect(usedUserId).to.equal(upload.userId)
const expected: JobPayload = {
jobId: upload.jobId,
const expected: JobPayloadV1 = {
fileName: upload.fileName,
token,
serverUrl: serverOrigin,
+15 -19
View File
@@ -1,26 +1,22 @@
import z from 'zod'
import { TIME } from '../../core/index.js'
const job = z.object({
jobId: z.string()
export const jobPayloadV1 = z.object({
serverUrl: z.string().url().describe('The url of the server'),
projectId: z.string(),
modelId: z.string(),
token: z.string(),
blobId: z.string(),
fileType: z.string(),
fileName: z.string(),
timeOutSeconds: z
.number()
.int()
.default(30 * TIME.minute)
.describe('The timeout for a single attempt of the job, in seconds.')
})
export const jobPayload = job.merge(
z.object({
serverUrl: z.string().url().describe('The url of the server'),
projectId: z.string(),
modelId: z.string(),
token: z.string(),
blobId: z.string(),
fileType: z.string(),
fileName: z.string(),
timeOutSeconds: z
.number()
.int()
.default(30 * TIME.minute)
})
)
export type JobPayload = z.infer<typeof jobPayload>
export type JobPayloadV1 = z.infer<typeof jobPayloadV1>
const baseFileImportResult = z.object({
durationSeconds: z
@@ -69,7 +65,7 @@ export const fileImportResultPayload = z.discriminatedUnion('status', [
export type FileImportResultPayload = z.infer<typeof fileImportResultPayload>
export type FileImportJobPayloadV1 = JobPayload & {
export type FileImportJobPayloadV1 = JobPayloadV1 & {
jobType: 'fileImport'
payloadVersion: 1
}