fix(server/fileuploads): prevent file upload jobs silently failing (#5327)

This commit is contained in:
Iain Sproat
2025-08-29 14:46:40 +01:00
committed by GitHub
parent cba989722b
commit fdf3b93e95
17 changed files with 636 additions and 115 deletions
@@ -16,7 +16,7 @@ from specklepy.core.api.inputs.file_import_inputs import (
from specklepy.core.api.models import Version
from ifc_importer.domain import FileimportPayload, JobStatus
from ifc_importer.repository import get_next_job, set_job_status, setup_connection
from ifc_importer.repository import get_next_job, return_job_to_queued, setup_connection
IDLE_TIMEOUT = 1
@@ -125,7 +125,8 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
),
)
)
# the server is responsible for moving successful jobs to the succeeded state
# mark it as succeeded so we do not enter any error handling routines on finalisation
job_status = JobStatus.SUCCEEDED
except TimeoutError as te:
@@ -142,7 +143,9 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
ex = e
job_status = JobStatus.FAILED
finally:
if job_status == JobStatus.FAILED:
if job_status == JobStatus.QUEUED:
await return_job_to_queued(connection, job_id)
elif job_status == JobStatus.FAILED:
# we should be reporting the failure to the server
logger.error("job processing failed", exc_info=ex)
try:
@@ -162,11 +165,13 @@ async def job_processor(logger: structlog.stdlib.BoundLogger):
),
)
)
# if the reporting of the failure does not succeed, we're requeueing
# unless we've reached the max attempts
# the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further
except Exception as ex:
if attempt >= job.max_attempt:
job_status = JobStatus.FAILED
else:
job_status = JobStatus.QUEUED
await set_job_status(connection, job_id, job_status)
logger.error("failed to report job failure", exc_info=ex)
await return_job_to_queued(connection, job_id)
# The client will not pick up a queued job if it now has exceeded max attempts.
# The server is responsible for moving queued jobs which have exceeded maximum attempts to failed status.
elif job_status == JobStatus.SUCCEEDED:
# do nothing
# we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server)
continue
@@ -33,6 +33,7 @@ async def get_next_job(connection: Connection) -> FileimportJob | None:
WHERE ( --queued job
payload ->> 'fileType' = 'ifc'
AND status = $2
AND "attempt" < "maxAttempt"
)
OR ( --timed job left on processing state
payload ->> 'fileType' = 'ifc'
@@ -55,6 +56,11 @@ async def get_next_job(connection: Connection) -> FileimportJob | None:
return FileimportJob.model_validate(dict(job))
async def return_job_to_queued(connection: Connection, job_id: str) -> None:
print(f"returning job: {job_id} to queued")
return await set_job_status(connection, job_id, JobStatus.QUEUED)
async def set_job_status(
connection: Connection, job_id: str, job_status: JobStatus
) -> None:
@@ -1,9 +1,5 @@
import { z } from 'zod'
export const BackgroundJobType = {
FileImport: 'fileImport'
} as const
export const BackgroundJobStatus = {
Queued: 'queued',
Processing: 'processing', // this status does not exist in db
@@ -14,11 +10,8 @@ export const BackgroundJobStatus = {
export type BackgroundJobStatus =
(typeof BackgroundJobStatus)[keyof typeof BackgroundJobStatus]
export type BackgroundJobType =
(typeof BackgroundJobType)[keyof typeof BackgroundJobType]
export const BackgroundJobPayload = z.object({
jobType: z.nativeEnum(BackgroundJobType),
jobType: z.string(),
payloadVersion: z.number()
})
@@ -46,6 +39,16 @@ export type StoreBackgroundJob = (args: {
export type GetBackgroundJob<T extends BackgroundJobPayload = BackgroundJobPayload> =
(args: { jobId: string }) => Promise<BackgroundJob<T> | null>
export type FailQueuedBackgroundJobsWhichExceedMaximumAttempts<
T extends BackgroundJobPayload = BackgroundJobPayload
> = (args: { originServerUrl: string; jobType: string }) => Promise<BackgroundJob<T>[]>
export type UpdateBackgroundJob<T extends BackgroundJobPayload = BackgroundJobPayload> =
(args: {
jobId: string
status: BackgroundJobStatus
}) => Promise<BackgroundJob<T> | null>
export type GetBackgroundJobCount<
T extends BackgroundJobPayload = BackgroundJobPayload
> = (args: {
@@ -1,10 +1,15 @@
import type { Knex } from 'knex'
import type {
FailQueuedBackgroundJobsWhichExceedMaximumAttempts,
UpdateBackgroundJob
} from '@/modules/backgroundjobs/domain'
import {
type BackgroundJob,
type BackgroundJobPayload,
type GetBackgroundJob,
type GetBackgroundJobCount,
type StoreBackgroundJob
type StoreBackgroundJob,
BackgroundJobStatus
} from '@/modules/backgroundjobs/domain'
import { buildTableHelper } from '@/modules/core/dbSchema'
@@ -48,6 +53,48 @@ export const getBackgroundJobFactory =
return job ?? null
}
export const failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory =
<T extends BackgroundJobPayload = BackgroundJobPayload>({
db
}: {
db: Knex
}): FailQueuedBackgroundJobsWhichExceedMaximumAttempts<T> =>
async ({ jobType, originServerUrl }) => {
const query = tables
.backgroundJobs(db)
.where(BackgroundJobs.withoutTablePrefix.col.originServerUrl, originServerUrl)
.andWhere(
BackgroundJobs.withoutTablePrefix.col.status,
BackgroundJobStatus.Queued
)
.andWhere(BackgroundJobs.withoutTablePrefix.col.jobType, jobType)
.andWhere(
BackgroundJobs.withoutTablePrefix.col.attempt,
'>=',
db.raw('"maxAttempt"') // camel-case requires the column name to be wrapped in double quotes
)
.orderBy(BackgroundJobs.withoutTablePrefix.col.createdAt, 'desc')
.update({
[BackgroundJobs.withoutTablePrefix.col.status]: BackgroundJobStatus.Failed
})
.returning<BackgroundJob<T>[]>('*')
return await query
}
export const updateBackgroundJobFactory =
({ db }: { db: Knex }): UpdateBackgroundJob =>
async ({ jobId, status }) => {
const query = tables
.backgroundJobs(db)
.update({ status })
.where({ id: jobId })
.returning('*')
const rows = await query
if (rows.length === 0) return null
return rows[0]
}
export const getBackgroundJobCountFactory =
({ db }: { db: Knex }): GetBackgroundJobCount =>
async ({ status, jobType, minAttempts }) => {
@@ -3,7 +3,8 @@ import {
storeBackgroundJobFactory,
getBackgroundJobFactory,
BackgroundJobs,
getBackgroundJobCountFactory
getBackgroundJobCountFactory,
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory
} from '@/modules/backgroundjobs/repositories'
import type {
BackgroundJob,
@@ -15,6 +16,31 @@ import { createRandomString } from '@/modules/core/helpers/testHelpers'
const originServerUrl = 'http://example.org'
type TestJobPayload = BackgroundJobPayload & {
jobType: 'fileImport'
payloadVersion: 1
testData: string
}
const createTestJob = (
overrides: Partial<BackgroundJob<TestJobPayload>> = {}
): BackgroundJob<TestJobPayload> => ({
id: createRandomString(10),
jobType: 'fileImport',
payload: {
jobType: 'fileImport',
payloadVersion: 1,
testData: 'test-data-value'
},
status: BackgroundJobStatus.Queued,
attempt: 0,
maxAttempt: 3,
timeoutMs: 30000,
createdAt: new Date(),
updatedAt: new Date(),
...overrides
})
describe('Background Jobs repositories @backgroundjobs', () => {
const storeBackgroundJob = storeBackgroundJobFactory({
db,
@@ -23,31 +49,6 @@ describe('Background Jobs repositories @backgroundjobs', () => {
const getBackgroundJob = getBackgroundJobFactory({ db })
const getBackgroundJobCount = getBackgroundJobCountFactory({ db })
type TestJobPayload = BackgroundJobPayload & {
jobType: 'fileImport'
payloadVersion: 1
testData: string
}
const createTestJob = (
overrides: Partial<BackgroundJob<TestJobPayload>> = {}
): BackgroundJob<TestJobPayload> => ({
id: createRandomString(10),
jobType: 'fileImport',
payload: {
jobType: 'fileImport',
payloadVersion: 1,
testData: 'test-data-value'
},
status: BackgroundJobStatus.Queued,
attempt: 0,
maxAttempt: 3,
timeoutMs: 30000,
createdAt: new Date(),
updatedAt: new Date(),
...overrides
})
beforeEach(async () => {
// Clean up background jobs table
await db(BackgroundJobs.name).del()
@@ -171,4 +172,24 @@ describe('Background Jobs repositories @backgroundjobs', () => {
expect(count).to.equal(1)
})
})
describe('failQueuedBackgroundJobsWhichExceedMaximumAttempts', () => {
it('should fail queued background jobs that exceed maximum attempts', async () => {
const job = createTestJob({
status: BackgroundJobStatus.Queued,
attempt: 2,
maxAttempt: 2
})
await storeBackgroundJob({ job })
const SUT = failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
db
})
await SUT({ originServerUrl, jobType: 'fileImport' })
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
})
})
})
@@ -5,3 +5,16 @@ export const FileUploadDatabaseEvents = {
export const DelayBetweenFileImportRetriesMinutes = 5
export const NumberOfFileImportRetries = 5
export const BackgroundJobType = {
FileImport: 'fileImport'
} as const
export type BackgroundJobType =
(typeof BackgroundJobType)[keyof typeof BackgroundJobType]
export const BackgroundJobPayloadVersion = {
v1: 1
} as const
export type BackgroundJobPayloadVersion =
(typeof BackgroundJobPayloadVersion)[keyof typeof BackgroundJobPayloadVersion]
@@ -58,6 +58,10 @@ export type GarbageCollectPendingUploadedFiles = (args: {
timeoutThresholdSeconds: number
}) => Promise<FileUploadRecord[]>
export type FailPendingUploadedFiles = (args: {
uploadIds: string[]
}) => Promise<FileUploadRecord[]>
export type NotifyChangeInFileStatus = (params: {
file: FileUploadRecord
}) => Promise<void>
@@ -26,6 +26,7 @@ import {
import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper'
import {
fileImportServiceShouldUsePrivateObjectsServerUrl,
getFileImporterQueuePostgresUrl,
getFileUploadUrlExpiryMinutes,
getPrivateObjectsServerOrigin,
getServerOrigin,
@@ -75,6 +76,8 @@ import { onFileImportResultFactory } from '@/modules/fileuploads/services/result
import type { FileImportResultPayload } from '@speckle/shared/workers/fileimport'
import { JobResultStatus } from '@speckle/shared/workers/fileimport'
import type { GraphQLContext } from '@/modules/shared/helpers/typeHelper'
import { updateBackgroundJobFactory } from '@/modules/backgroundjobs/repositories'
import { configureClient } from '@/knexfile'
const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED } = getFeatureFlags()
@@ -102,6 +105,11 @@ const getFileUploadModel = async (params: {
return null
}
const fileImporterConnectionUri = getFileImporterQueuePostgresUrl()
const queueDb = fileImporterConnectionUri
? configureClient({ postgres: { connectionUri: fileImporterConnectionUri } }).public
: db
const fileUploadMutations: Resolvers['FileUploadMutations'] = {
async generateUploadUrl(_parent, args, ctx) {
const { projectId } = args.input
@@ -294,7 +302,11 @@ const fileUploadMutations: Resolvers['FileUploadMutations'] = {
logger: logger.child({ fileUploadStatus: status }),
updateFileUpload: updateFileUploadFactory({ db: projectDb }),
getFileInfo: getFileInfoFactoryV2({ db: projectDb }),
eventEmit: getEventBus().emit
updateBackgroundJob: updateBackgroundJobFactory({
db: queueDb
}),
eventEmit: getEventBus().emit,
FF_NEXT_GEN_FILE_IMPORTER_ENABLED
})
await onFileImportResult({
+24 -63
View File
@@ -1,5 +1,4 @@
import type cron from 'node-cron'
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { moduleLogger } from '@/observability/logging'
import {
onFileImportProcessedFactory,
@@ -21,26 +20,16 @@ import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { listenFor } from '@/modules/core/utils/dbNotificationListener'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
expireOldPendingUploadsFactory,
getFileInfoFactory,
updateFileUploadFactory
} from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper'
import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import {
acquireTaskLockFactory,
releaseTaskLockFactory
} from '@/modules/core/repositories/scheduledTasks'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks'
import { TIME } from '@speckle/shared'
import {
DelayBetweenFileImportRetriesMinutes,
FileUploadDatabaseEvents,
NumberOfFileImportRetries
} from '@/modules/fileuploads/domain/consts'
import { FileUploadDatabaseEvents } from '@/modules/fileuploads/domain/consts'
import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router'
import {
shutdownQueues,
@@ -53,54 +42,15 @@ import { reportSubscriptionEventsFactory } from '@/modules/fileuploads/events/su
import { configureClient } from '@/knexfile'
import { MisconfiguredEnvironmentError } from '@/modules/shared/errors'
import { rhinoImporterSupportedFileExtensions } from '@speckle/shared/blobs'
import { scheduleFileImportExpiry } from '@/modules/fileuploads/tasks/expireFileImports'
import { scheduleBackgroundJobGarbageCollection } from '@/modules/fileuploads/tasks/garbageCollectBackgroundJobs'
const { FF_NEXT_GEN_FILE_IMPORTER_ENABLED, FF_RHINO_FILE_IMPORTER_ENABLED } =
getFeatureFlags()
let scheduledTasks: cron.ScheduledTask[] = []
const EveryMinute = '*/1 * * * *'
const scheduleFileImportExpiry = async ({
scheduleExecution
}: {
scheduleExecution: ScheduleExecution
}) => {
const fileImportExpiryHandlers: ReturnType<typeof manageFileImportExpiryFactory>[] =
[]
const regionClients = await getRegisteredDbClients()
for (const projectDb of [db, ...regionClients]) {
fileImportExpiryHandlers.push(
manageFileImportExpiryFactory({
garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({
db: projectDb
}),
notifyUploadStatus: notifyChangeInFileStatus({
eventEmit: getEventBus().emit
})
})
)
}
const cronExpression = '*/5 * * * *' // every 5 minutes
return scheduleExecution(
cronExpression,
'FileImportExpiry',
async (_scheduledTime, { logger }) => {
await Promise.all(
fileImportExpiryHandlers.map((handler) =>
handler({
logger,
timeoutThresholdSeconds:
(NumberOfFileImportRetries *
(getFileImportTimeLimitMinutes() +
DelayBetweenFileImportRetriesMinutes) +
1) * // additional buffer of 1 minute
TIME.minute
})
)
)
}
)
}
const scheduledTasks: cron.ScheduledTask[] = []
export const init: SpeckleModule['init'] = async ({
app,
@@ -116,10 +66,14 @@ export const init: SpeckleModule['init'] = async ({
if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED)
moduleLogger.info('📄 Next Gen File Importer is ENABLED')
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db }),
releaseTaskLock: releaseTaskLockFactory({ db })
})
let observeResult: ObserveResult | undefined = undefined
if (isInitial) {
// this feature flag is going away soon
if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED) {
moduleLogger.info('🗳️ Next Gen File importer is ENABLED')
const connectionUri = getFileImporterQueuePostgresUrl()
@@ -152,14 +106,22 @@ export const init: SpeckleModule['init'] = async ({
registers: [metricsRegister],
requestQueues
}))
scheduledTasks.push(
await scheduleBackgroundJobGarbageCollection({
queueDb,
scheduleExecution,
cronExpression: EveryMinute
})
)
}
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db }),
releaseTaskLock: releaseTaskLockFactory({ db })
})
scheduledTasks = [await scheduleFileImportExpiry({ scheduleExecution })]
scheduledTasks.push(
await scheduleFileImportExpiry({
scheduleExecution,
cronExpression: EveryMinute
})
)
await listenFor(FileUploadDatabaseEvents.Updated, async (msg) => {
const parsedMessage = parseMessagePayload(msg.payload)
@@ -201,7 +163,6 @@ export const init: SpeckleModule['init'] = async ({
})()
}
// the two routers can be used independently and can both be enabled
app.use(fileuploadRouterFactory())
}
@@ -8,7 +8,9 @@ import type { JobPayload } from '@speckle/shared/workers/fileimport'
import type { FileImportQueue } from '@/modules/fileuploads/domain/types'
import {
NumberOfFileImportRetries,
DelayBetweenFileImportRetriesMinutes
DelayBetweenFileImportRetriesMinutes,
BackgroundJobType,
BackgroundJobPayloadVersion
} from '@/modules/fileuploads/domain/consts'
import type { Knex } from 'knex'
import { migrateDbToLatest } from '@/db/migrations'
@@ -17,7 +19,7 @@ import {
getBackgroundJobCountFactory,
storeBackgroundJobFactory
} from '@/modules/backgroundjobs/repositories'
import { BackgroundJobStatus, BackgroundJobType } from '@/modules/backgroundjobs/domain'
import { BackgroundJobStatus } from '@/modules/backgroundjobs/domain'
export const fileImportQueues: FileImportQueue[] = []
@@ -55,7 +57,11 @@ export const initializePostgresQueue = async ({
shutdown: async () => {},
scheduleJob: async (jobData: JobPayload) => {
await createBackgroundJob({
jobPayload: { jobType: 'fileImport', payloadVersion: 1, ...jobData }
jobPayload: {
jobType: BackgroundJobType.FileImport,
payloadVersion: BackgroundJobPayloadVersion.v1,
...jobData
}
})
},
metrics: {
@@ -11,7 +11,8 @@ import type {
GetModelUploadsItems,
GetModelUploadsBaseArgs,
GetModelUploadsTotalCount,
UpdateFileStatus
UpdateFileStatus,
FailPendingUploadedFiles
} from '@/modules/fileuploads/domain/operations'
import type {
FileUploadRecord,
@@ -182,6 +183,26 @@ export const expireOldPendingUploadsFactory =
return updatedRows
}
export const failPendingUploadedFilesFactory =
(deps: { db: Knex }): FailPendingUploadedFiles =>
async (params) => {
const updatedRows = await deps
.db(FileUploads.name)
.whereIn(FileUploads.withoutTablePrefix.col.id, params.uploadIds)
.whereIn(FileUploads.withoutTablePrefix.col.convertedStatus, [
FileUploadConvertedStatus.Queued,
FileUploadConvertedStatus.Converting
])
.update({
[FileUploads.withoutTablePrefix.col.convertedStatus]:
FileUploadConvertedStatus.Error,
[FileUploads.withoutTablePrefix.col.convertedMessage]: 'File import job failed',
[FileUploads.withoutTablePrefix.col.convertedLastUpdate]: deps.db.fn.now()
})
.returning<FileUploadRecord[]>('*')
return updatedRows
}
const getPendingUploadsBaseQueryFactory =
(deps: { db: Knex }) =>
(streamId: string, options?: Partial<{ ignoreOld: boolean; limit: number }>) => {
@@ -13,12 +13,19 @@ import type { FileUploadRecord } from '@/modules/fileuploads/helpers/types'
import { FileImportJobNotFoundError } from '@/modules/fileuploads/helpers/errors'
import type { EventBusEmit } from '@/modules/shared/services/eventBus'
import { FileuploadEvents } from '@/modules/fileuploads/domain/events'
import {
BackgroundJobStatus,
type UpdateBackgroundJob
} from '@/modules/backgroundjobs/domain'
import { JobResultStatus } from '@speckle/shared/workers/fileimport'
type OnFileImportResultDeps = {
getFileInfo: GetFileInfoV2
updateFileUpload: UpdateFileUpload
updateBackgroundJob: UpdateBackgroundJob
eventEmit: EventBusEmit
logger: Logger
FF_NEXT_GEN_FILE_IMPORTER_ENABLED: boolean
}
export const onFileImportResultFactory =
@@ -46,8 +53,10 @@ export const onFileImportResultFactory =
})
let convertedCommitId = null
let newStatusForBackgroundJob: BackgroundJobStatus = BackgroundJobStatus.Processing
switch (jobResult.status) {
case 'error':
case JobResultStatus.Error:
boundLogger.warn(
{
duration: jobResult.result.durationSeconds,
@@ -55,9 +64,11 @@ export const onFileImportResultFactory =
},
'Processing error result for file upload'
)
newStatusForBackgroundJob = BackgroundJobStatus.Failed
break
case 'success':
case JobResultStatus.Success:
convertedCommitId = jobResult.result.versionId
newStatusForBackgroundJob = BackgroundJobStatus.Succeeded
boundLogger.info(
{
duration: jobResult.result.durationSeconds,
@@ -71,6 +82,23 @@ export const onFileImportResultFactory =
const status = jobResultStatusToFileUploadStatus(jobResult.status)
const convertedMessage = jobResultToConvertedMessage(jobResult)
if (deps.FF_NEXT_GEN_FILE_IMPORTER_ENABLED) {
try {
await deps.updateBackgroundJob({
jobId,
status: newStatusForBackgroundJob
})
} catch (e) {
const err = ensureError(e)
logger.error(
{ err },
'Error updating background job status in database. Job ID: %s',
jobId
)
throw err
}
}
let updatedFile: FileUploadRecord
try {
updatedFile = await deps.updateFileUpload({
@@ -1,8 +1,13 @@
import type { Logger } from '@/observability/logging'
import type {
FailPendingUploadedFiles,
GarbageCollectPendingUploadedFiles,
NotifyChangeInFileStatus
} from '@/modules/fileuploads/domain/operations'
import type { FailQueuedBackgroundJobsWhichExceedMaximumAttempts } from '@/modules/backgroundjobs/domain'
import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport'
import { BackgroundJobType } from '@/modules/fileuploads/domain/consts'
import { LogicError } from '@/modules/shared/errors'
export const manageFileImportExpiryFactory = (deps: {
garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles
@@ -22,3 +27,53 @@ export const manageFileImportExpiryFactory = (deps: {
}
}
}
export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: {
failQueuedBackgroundJobsWhichExceedMaximumAttempts: FailQueuedBackgroundJobsWhichExceedMaximumAttempts<FileImportJobPayloadV1>
failPendingUploadedFiles: FailPendingUploadedFiles
notifyUploadStatus: NotifyChangeInFileStatus
}): ((params: { logger: Logger; originServerUrl: string }) => Promise<void>) => {
const {
failQueuedBackgroundJobsWhichExceedMaximumAttempts,
failPendingUploadedFiles,
notifyUploadStatus
} = deps
return async (params) => {
const { logger, originServerUrl } = params
const failedBackgroundJobs =
await failQueuedBackgroundJobsWhichExceedMaximumAttempts({
originServerUrl,
jobType: BackgroundJobType.FileImport
})
logger.info(
`Found ${failedBackgroundJobs.length} background jobs which have exceeded maximum number of attempts`
)
if (failedBackgroundJobs.length === 0) {
return
}
const fileIds = failedBackgroundJobs.map((job) => job.payload.blobId)
if (fileIds.length !== failedBackgroundJobs.length || fileIds.some((id) => !id)) {
throw new LogicError(
'We do not have a valid file Id for all failed background jobs',
{
info: {
fileIds
}
}
)
}
const updatedUploads = await failPendingUploadedFiles({
uploadIds: fileIds
})
for (const upload of updatedUploads) {
await notifyUploadStatus({
file: upload
})
}
}
}
@@ -0,0 +1,57 @@
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { expireOldPendingUploadsFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { getFileImportTimeLimitMinutes } from '@/modules/shared/helpers/envHelper'
import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks'
import { TIME } from '@speckle/shared'
import {
DelayBetweenFileImportRetriesMinutes,
NumberOfFileImportRetries
} from '@/modules/fileuploads/domain/consts'
import { getEventBus } from '@/modules/shared/services/eventBus'
export const scheduleFileImportExpiry = async ({
scheduleExecution,
cronExpression
}: {
scheduleExecution: ScheduleExecution
cronExpression: string
}) => {
const fileImportExpiryHandlers: ReturnType<typeof manageFileImportExpiryFactory>[] =
[]
const regionClients = await getRegisteredDbClients()
for (const projectDb of [db, ...regionClients]) {
fileImportExpiryHandlers.push(
manageFileImportExpiryFactory({
garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({
db: projectDb
}),
notifyUploadStatus: notifyChangeInFileStatus({
eventEmit: getEventBus().emit
})
})
)
}
return scheduleExecution(
cronExpression,
'FileImportExpiry',
async (_scheduledTime, { logger }) => {
await Promise.all(
fileImportExpiryHandlers.map((handler) =>
handler({
logger,
timeoutThresholdSeconds:
(NumberOfFileImportRetries *
(getFileImportTimeLimitMinutes() +
DelayBetweenFileImportRetriesMinutes) +
1) * // additional buffer of 1 minute
TIME.minute
})
)
)
}
)
}
@@ -0,0 +1,54 @@
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { db } from '@/db/knex'
import { getRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks'
import { failPendingUploadedFilesFactory } from '@/modules/fileuploads/repositories/fileUploads'
import { failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory } from '@/modules/backgroundjobs/repositories'
import type { Knex } from 'knex'
import { getServerOrigin } from '@/modules/shared/helpers/envHelper'
export const scheduleBackgroundJobGarbageCollection = async ({
queueDb,
scheduleExecution,
cronExpression
}: {
queueDb: Knex
scheduleExecution: ScheduleExecution
cronExpression: string
}) => {
const perDbTask: ReturnType<
typeof garbageCollectAttemptedFileImportBackgroundJobsFactory
>[] = []
const regionClients = await getRegisteredDbClients()
for (const projectDb of [db, ...regionClients]) {
perDbTask.push(
garbageCollectAttemptedFileImportBackgroundJobsFactory({
failQueuedBackgroundJobsWhichExceedMaximumAttempts:
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
db: queueDb
}),
failPendingUploadedFiles: failPendingUploadedFilesFactory({ db: projectDb }),
notifyUploadStatus: notifyChangeInFileStatus({
eventEmit: getEventBus().emit
})
})
)
}
return scheduleExecution(
cronExpression,
'GarbageCollectBackgroundJobs',
async (_scheduledTime, { logger }) => {
await Promise.all(
perDbTask.map((task) =>
task({
logger,
originServerUrl: getServerOrigin()
})
)
)
}
)
}
@@ -0,0 +1,223 @@
import { expect } from 'chai'
import { garbageCollectAttemptedFileImportBackgroundJobsFactory } from '@/modules/fileuploads/services/tasks'
import {
BackgroundJobs,
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory,
getBackgroundJobFactory,
storeBackgroundJobFactory
} from '@/modules/backgroundjobs/repositories'
import { db } from '@/db/knex'
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
failPendingUploadedFilesFactory,
getFileInfoFactoryV2,
saveUploadFileFactoryV2
} from '@/modules/fileuploads/repositories/fileUploads'
import { testLogger } from '@/observability/logging'
import {
type BackgroundJobPayload,
BackgroundJobStatus,
type BackgroundJob
} from '@/modules/backgroundjobs/domain'
import type { FileImportJobPayloadV1 } from '@speckle/shared/workers/fileimport'
import cryptoRandomString from 'crypto-random-string'
import type { FileUploadRecordV2 } from '@/modules/fileuploads/helpers/types'
import {
type BasicTestStream,
createTestStream
} from '@/test/speckle-helpers/streamHelper'
import { type BasicTestUser, createTestUser } from '@/test/authHelper'
import type { BasicTestBranch } from '@/test/speckle-helpers/branchHelper'
import { createTestBranch } from '@/test/speckle-helpers/branchHelper'
import { FileUploadConvertedStatus } from '@speckle/shared/blobs'
const originServerUrl = 'https://example.org'
export type TestJobPayload = BackgroundJobPayload & {
jobType: 'fileImport'
payloadVersion: 1
blobId: string
testData: string
}
export const createTestJob = (
overrides: Partial<BackgroundJob<TestJobPayload>> = {}
): BackgroundJob<TestJobPayload> => ({
id: cryptoRandomString({ length: 10 }),
jobType: 'fileImport',
payload: {
jobType: 'fileImport',
payloadVersion: 1,
blobId: cryptoRandomString({ length: 10 }),
testData: 'test-data-value'
},
status: BackgroundJobStatus.Queued,
attempt: 0,
maxAttempt: 3,
timeoutMs: 30000,
createdAt: new Date(),
updatedAt: new Date(),
...overrides
})
const createTestJobPayload = (overrides: Partial<TestJobPayload>): TestJobPayload => ({
jobType: 'fileImport',
payloadVersion: 1,
blobId: cryptoRandomString({ length: 10 }),
testData: cryptoRandomString({ length: 100 }),
...overrides
})
const createTestFileUpload = (
overrides: Partial<
Pick<
FileUploadRecordV2,
'projectId' | 'userId' | 'fileName' | 'fileType' | 'fileSize'
> & { fileId: string; modelId: string; modelName: string }
>
) => {
return {
projectId: cryptoRandomString({ length: 10 }),
userId: cryptoRandomString({ length: 10 }),
fileName: cryptoRandomString({ length: 10 }),
fileType: cryptoRandomString({ length: 10 }),
fileSize: Math.floor(Math.random() * 10_000),
fileId: cryptoRandomString({ length: 10 }),
modelId: cryptoRandomString({ length: 10 }),
modelName: cryptoRandomString({ length: 10 }),
...overrides
}
}
type StoredBackgroundJob = BackgroundJob<FileImportJobPayloadV1> & {
originServerUrl: string
}
describe('File import garbage collection @fileuploads integration', () => {
const storeBackgroundJob = storeBackgroundJobFactory({
db,
originServerUrl
})
const getBackgroundJob = getBackgroundJobFactory({ db })
const saveUploadFile = saveUploadFileFactoryV2({ db })
const getUploadFile = getFileInfoFactoryV2({ db })
let userOne: BasicTestUser
let projectOne: BasicTestStream
let modelOne: BasicTestBranch
before(async () => {
userOne = await createTestUser({
id: '',
email: cryptoRandomString({ length: 10 }) + '@example.org',
name: cryptoRandomString({ length: 10 })
})
projectOne = await createTestStream(
{
id: '',
name: cryptoRandomString({ length: 10 }),
ownerId: userOne.id
},
userOne
)
modelOne = await createTestBranch({
branch: {
id: '',
name: cryptoRandomString({ length: 10 }),
authorId: userOne.id,
streamId: projectOne.id
},
stream: projectOne,
owner: userOne
})
})
beforeEach(async () => {
// Clean up background jobs table
await db(BackgroundJobs.name).del()
})
describe('garbage collect file import background jobs', () => {
const SUT = garbageCollectAttemptedFileImportBackgroundJobsFactory({
failQueuedBackgroundJobsWhichExceedMaximumAttempts:
failQueuedBackgroundJobsWhichExceedMaximumAttemptsFactory({
db
}),
failPendingUploadedFiles: failPendingUploadedFilesFactory({
db
}),
notifyUploadStatus: notifyChangeInFileStatus({
eventEmit: getEventBus().emit
})
})
it('should garbage collect failed background jobs', async () => {
const identifiableData = cryptoRandomString({ length: 10 })
const fileOne = createTestFileUpload({
projectId: projectOne.id,
modelId: modelOne.id
})
const processingJobAtMaxAttempts = createTestJob({
status: BackgroundJobStatus.Processing,
payload: createTestJobPayload({
testData: identifiableData,
blobId: fileOne.fileId
}),
attempt: 3,
maxAttempt: 3
})
const fileTwo = createTestFileUpload({
projectId: projectOne.id,
modelId: modelOne.id
})
const queuedJobAtMaxAttempts = createTestJob({
status: BackgroundJobStatus.Queued,
payload: createTestJobPayload({
testData: identifiableData,
blobId: fileTwo.fileId
}),
attempt: 3,
maxAttempt: 3
})
await saveUploadFile(fileOne)
await storeBackgroundJob({ job: processingJobAtMaxAttempts })
await saveUploadFile(fileTwo)
await storeBackgroundJob({ job: queuedJobAtMaxAttempts })
// ensure jobs are in the database and retrievable
const existing = await db(BackgroundJobs.name)
.whereJsonSupersetOf(BackgroundJobs.withoutTablePrefix.col.payload, {
testData: identifiableData
})
.select<StoredBackgroundJob[]>('*')
expect(existing).to.have.length(2)
expect(
existing.filter((j) => j.status === BackgroundJobStatus.Queued),
JSON.stringify(existing)
).to.have.length(1)
expect(
existing.filter((j) => j.status === BackgroundJobStatus.Processing),
JSON.stringify(existing)
).to.have.length(1)
await SUT({ logger: testLogger, originServerUrl })
// processing job should not have been garbage collected
const resultOne = await getBackgroundJob({ jobId: processingJobAtMaxAttempts.id })
expect(resultOne?.status).to.equal(BackgroundJobStatus.Processing)
// queued job should have been garbage collected
const resultTwo = await getBackgroundJob({ jobId: queuedJobAtMaxAttempts.id })
expect(resultTwo?.status).to.equal(BackgroundJobStatus.Failed)
const fileOneResult = await getUploadFile({ fileId: fileOne.fileId })
const fileTwoResult = await getUploadFile({ fileId: fileTwo.fileId })
expect(fileOneResult?.convertedStatus).to.equal(FileUploadConvertedStatus.Queued)
expect(fileTwoResult?.convertedStatus).to.equal(FileUploadConvertedStatus.Error)
})
})
})
@@ -68,3 +68,8 @@ export const fileImportResultPayload = z.discriminatedUnion('status', [
])
export type FileImportResultPayload = z.infer<typeof fileImportResultPayload>
export type FileImportJobPayloadV1 = JobPayload & {
jobType: 'fileImport'
payloadVersion: 1
}