fix(preview service): check bull queue is ready before processing
This commit is contained in:
@@ -21,6 +21,7 @@ import { logger } from '@/logging.js'
|
||||
import { jobProcessor } from '@/jobProcessor.js'
|
||||
import { AppState } from '@/const.js'
|
||||
import { initMetrics, initPrometheusRegistry } from '@/metrics.js'
|
||||
import { ensureError } from '@speckle/shared'
|
||||
|
||||
const app = express()
|
||||
const host = HOST
|
||||
@@ -66,7 +67,7 @@ const opts = {
|
||||
}
|
||||
}
|
||||
}
|
||||
const jobQueue = new Bull(JobQueueName, opts)
|
||||
let jobQueue: Bull.Queue | undefined = undefined
|
||||
|
||||
// store this callback, so on shutdown we can error the job
|
||||
let currentJob: { logger: Logger; done: Bull.DoneCallback } | undefined = undefined
|
||||
@@ -112,6 +113,16 @@ const server = app.listen(port, host, async () => {
|
||||
handleSIGTERM: false
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
const newQueue = new Bull(JobQueueName, opts)
|
||||
jobQueue = await newQueue.isReady()
|
||||
} catch (e) {
|
||||
const err = ensureError(e, 'Unknown error creating job queue')
|
||||
logger.error({ err }, 'Error creating job queue')
|
||||
throw err
|
||||
}
|
||||
|
||||
logger.debug(`Starting processing of "${JobQueueName}" message queue`)
|
||||
|
||||
// nothing after this line is getting called, this blocks
|
||||
@@ -185,10 +196,12 @@ const beforeShutdown = async () => {
|
||||
logger.info('🛑 Beginning shut down, pausing all jobs')
|
||||
appState = AppState.SHUTTINGDOWN
|
||||
// stop accepting new jobs and kill any running jobs
|
||||
await jobQueue.pause(
|
||||
true, // just pausing this local worker of the queue
|
||||
true // do not wait for active jobs to finish
|
||||
)
|
||||
if (jobQueue) {
|
||||
await jobQueue.pause(
|
||||
true, // just pausing this local worker of the queue
|
||||
true // do not wait for active jobs to finish
|
||||
)
|
||||
}
|
||||
|
||||
if (currentJob) {
|
||||
currentJob.logger.warn('Cancelling job due to preview-service shutdown')
|
||||
@@ -217,6 +230,10 @@ createTerminus(server, {
|
||||
return Promise.reject(new Error('Preview service is shutting down'))
|
||||
}
|
||||
|
||||
if (!jobQueue) {
|
||||
return Promise.reject(new Error('Job queue is not initialized'))
|
||||
}
|
||||
|
||||
const isReady = await jobQueue.isReady()
|
||||
if (!isReady)
|
||||
return Promise.reject(
|
||||
|
||||
Reference in New Issue
Block a user