Merge branch 'main' into andrew/filtering-followup-improvements
This commit is contained in:
@@ -174,7 +174,7 @@ describe('Background Jobs repositories @backgroundjobs', () => {
|
||||
})
|
||||
|
||||
describe('failQueuedBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory', () => {
|
||||
it('should fail queued background jobs that meet or exceed maximum attempts', async () => {
|
||||
it('should fail queued background jobs that equal maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Queued,
|
||||
attempt: 2,
|
||||
@@ -193,6 +193,101 @@ describe('Background Jobs repositories @backgroundjobs', () => {
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
|
||||
})
|
||||
|
||||
it('should not fail processing background jobs that meet the maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Processing,
|
||||
attempt: 1,
|
||||
maxAttempt: 2
|
||||
})
|
||||
await storeBackgroundJob({ job })
|
||||
|
||||
const SUT =
|
||||
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
|
||||
db
|
||||
})
|
||||
|
||||
await SUT({ originServerUrl, jobType: 'fileImport' })
|
||||
|
||||
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Processing)
|
||||
})
|
||||
|
||||
it('should fail queued background jobs that exceed maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Queued,
|
||||
attempt: 3,
|
||||
maxAttempt: 2
|
||||
})
|
||||
await storeBackgroundJob({ job })
|
||||
|
||||
const SUT =
|
||||
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
|
||||
db
|
||||
})
|
||||
|
||||
await SUT({ originServerUrl, jobType: 'fileImport' })
|
||||
|
||||
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
|
||||
})
|
||||
|
||||
it('should fail processing background jobs that exceed maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Processing,
|
||||
attempt: 3,
|
||||
maxAttempt: 2
|
||||
})
|
||||
await storeBackgroundJob({ job })
|
||||
|
||||
const SUT =
|
||||
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
|
||||
db
|
||||
})
|
||||
|
||||
await SUT({ originServerUrl, jobType: 'fileImport' })
|
||||
|
||||
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Failed)
|
||||
})
|
||||
|
||||
it('should not fail queued background jobs that are within the maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Queued,
|
||||
attempt: 1,
|
||||
maxAttempt: 2
|
||||
})
|
||||
await storeBackgroundJob({ job })
|
||||
|
||||
const SUT =
|
||||
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
|
||||
db
|
||||
})
|
||||
|
||||
await SUT({ originServerUrl, jobType: 'fileImport' })
|
||||
|
||||
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Queued)
|
||||
})
|
||||
|
||||
it('should not fail processing background jobs that are within the maximum attempts', async () => {
|
||||
const job = createTestJob({
|
||||
status: BackgroundJobStatus.Processing,
|
||||
attempt: 1,
|
||||
maxAttempt: 2
|
||||
})
|
||||
await storeBackgroundJob({ job })
|
||||
|
||||
const SUT =
|
||||
failBackgroundJobsWhichExceedMaximumAttemptsOrNoRemainingComputeBudgetFactory({
|
||||
db
|
||||
})
|
||||
|
||||
await SUT({ originServerUrl, jobType: 'fileImport' })
|
||||
|
||||
const updatedJob = await db(BackgroundJobs.name).where({ id: job.id }).first()
|
||||
expect(updatedJob.status).to.equal(BackgroundJobStatus.Processing)
|
||||
})
|
||||
|
||||
it('should fail queued background jobs with zero compute budget', async () => {
|
||||
const job = createTestJob({
|
||||
payload: {
|
||||
|
||||
@@ -57,6 +57,28 @@ export const garbageCollectAttemptedFileImportBackgroundJobsFactory = (deps: {
|
||||
return
|
||||
}
|
||||
|
||||
const failedJobsDueToNoComputeBudget = failedBackgroundJobs.filter(
|
||||
(job) =>
|
||||
job.remainingComputeBudgetSeconds !== null &&
|
||||
job.remainingComputeBudgetSeconds <= 0
|
||||
)
|
||||
if (failedJobsDueToNoComputeBudget.length > 0) {
|
||||
logger.info(
|
||||
{ numberOfFailedBackgroundJobs: failedBackgroundJobs.length },
|
||||
'Found {numberOfFailedBackgroundJobs} background jobs which have exceeded their compute budget'
|
||||
)
|
||||
}
|
||||
|
||||
const failedJobsDueToExceededAttempts = failedBackgroundJobs.filter(
|
||||
(job) => job.attempt >= job.maxAttempt
|
||||
)
|
||||
if (failedJobsDueToExceededAttempts.length > 0) {
|
||||
logger.warn(
|
||||
{ numberOfFailedBackgroundJobs: failedBackgroundJobs.length },
|
||||
'Found {numberOfFailedBackgroundJobs} background jobs which have exceeded maximum number of attempts'
|
||||
)
|
||||
}
|
||||
|
||||
const validFailedBackgroundJobs = failedBackgroundJobs.filter(
|
||||
(job) => !!job.payload.blobId
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user