From af1acf398378665cf64dcd55a3f241d29e908df0 Mon Sep 17 00:00:00 2001 From: Iain Sproat <68657+iainsproat@users.noreply.github.com> Date: Fri, 11 Apr 2025 12:02:03 +0100 Subject: [PATCH] fix(preview service): check bull queue is ready before processing --- packages/preview-service/src/main.ts | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/packages/preview-service/src/main.ts b/packages/preview-service/src/main.ts index ecd56225a..6f70e016d 100644 --- a/packages/preview-service/src/main.ts +++ b/packages/preview-service/src/main.ts @@ -21,6 +21,7 @@ import { logger } from '@/logging.js' import { jobProcessor } from '@/jobProcessor.js' import { AppState } from '@/const.js' import { initMetrics, initPrometheusRegistry } from '@/metrics.js' +import { ensureError } from '@speckle/shared' const app = express() const host = HOST @@ -66,7 +67,7 @@ const opts = { } } } -const jobQueue = new Bull(JobQueueName, opts) +let jobQueue: Bull.Queue | undefined = undefined // store this callback, so on shutdown we can error the job let currentJob: { logger: Logger; done: Bull.DoneCallback } | undefined = undefined @@ -112,6 +113,16 @@ const server = app.listen(port, host, async () => { handleSIGTERM: false }) } + + try { + const newQueue = new Bull(JobQueueName, opts) + jobQueue = await newQueue.isReady() + } catch (e) { + const err = ensureError(e, 'Unknown error creating job queue') + logger.error({ err }, 'Error creating job queue') + throw err + } + logger.debug(`Starting processing of "${JobQueueName}" message queue`) // nothing after this line is getting called, this blocks @@ -185,10 +196,12 @@ const beforeShutdown = async () => { logger.info('🛑 Beginning shut down, pausing all jobs') appState = AppState.SHUTTINGDOWN // stop accepting new jobs and kill any running jobs - await jobQueue.pause( - true, // just pausing this local worker of the queue - true // do not wait for active jobs to finish - ) + if (jobQueue) { + await jobQueue.pause( + true, // just pausing this local worker of the queue + true // do not wait for active jobs to finish + ) + } if (currentJob) { currentJob.logger.warn('Cancelling job due to preview-service shutdown') @@ -217,6 +230,10 @@ createTerminus(server, { return Promise.reject(new Error('Preview service is shutting down')) } + if (!jobQueue) { + return Promise.reject(new Error('Job queue is not initialized')) + } + const isReady = await jobQueue.isReady() if (!isReady) return Promise.reject(