From 290fdb3e6df5c886465e7b95dc865bb6c5d962bd Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Mon, 30 Jun 2025 16:12:45 +0300 Subject: [PATCH] feat(server): dev-only endpoint for monitoring bull queues (#5000) * feat(server): dev-only endpoint for monitoring bull queues * cr comments --- packages/server/modules/core/index.ts | 6 ++- .../server/modules/core/rest/monitoring.ts | 51 +++++++++++++++++++ packages/server/modules/fileuploads/index.ts | 33 +----------- packages/server/modules/previews/index.ts | 24 +-------- packages/server/package.json | 2 +- packages/shared/src/queue/config.ts | 24 ++++++--- 6 files changed, 78 insertions(+), 62 deletions(-) create mode 100644 packages/server/modules/core/rest/monitoring.ts diff --git a/packages/server/modules/core/index.ts b/packages/server/modules/core/index.ts index 777ed8170..22d5299b1 100644 --- a/packages/server/modules/core/index.ts +++ b/packages/server/modules/core/index.ts @@ -44,6 +44,7 @@ import { } from '@/modules/stats/repositories' import { getServerTotalModelCountFactory } from '@/modules/core/services/branch/retrieval' import { getServerTotalVersionCountFactory } from '@/modules/core/services/commit/retrieval' +import { bullMonitoringRouterFactory } from '@/modules/core/rest/monitoring' let stopTestSubs: (() => void) | undefined = undefined @@ -123,7 +124,7 @@ const coreModule: SpeckleModule<{ })() } }, - async finalize() { + async finalize({ app }) { // Update server profile in mp await updateServerMixpanelProfileFactory({ getServerInfo: getCachedServerInfoFactory({ db }), @@ -135,6 +136,9 @@ const coreModule: SpeckleModule<{ getServerTotalVersionCount: getServerTotalVersionCountFactory(), logger: coreLogger })() + + // Run BullMQ monitor once the app is fully ready + app.use(bullMonitoringRouterFactory()) }, async shutdown() { await shutdownResultListener() diff --git a/packages/server/modules/core/rest/monitoring.ts b/packages/server/modules/core/rest/monitoring.ts new file mode 100644 index 000000000..9d9f434a3 --- /dev/null +++ b/packages/server/modules/core/rest/monitoring.ts @@ -0,0 +1,51 @@ +import { getServerOrigin } from '@/modules/shared/helpers/envHelper' +import { Router } from 'express' +import { ExpressAdapter } from '@bull-board/express' +import { createBullBoard } from '@bull-board/api' +import { BullAdapter } from '@bull-board/api/bullAdapter' +import { getActiveQueues } from '@speckle/shared/queue' +import { moduleLogger } from '@/observability/logging' +import { authMiddlewareCreator } from '@/modules/shared/middleware' +import { validateServerRoleBuilderFactory } from '@/modules/shared/authz' +import { getRolesFactory } from '@/modules/shared/repositories/roles' +import { db } from '@/db/knex' +import { Roles } from '@speckle/shared' + +/** + * Has to be invoked after all speckle modules are initialized, cause only then we have + * the full set of Bull queues registered. + */ +export const bullMonitoringRouterFactory = (): Router => { + const router = Router() + + const relativeUrl = '/api/admin/bull-jobs' + const url = new URL(relativeUrl, getServerOrigin()) + const queues = getActiveQueues() + moduleLogger.info( + `Initializing Bull monitoring UI with ${ + Object.keys(queues).length + } queues at ${url.toString()}` + ) + + const serverAdapter = new ExpressAdapter() + serverAdapter.setBasePath(relativeUrl) + createBullBoard({ + serverAdapter, + queues: Object.values(queues).map((q) => new BullAdapter(q)) + }) + + router.use( + relativeUrl, + // Admin only + async (req, res, next) => { + await authMiddlewareCreator([ + validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({ + requiredRole: Roles.Server.Admin + }) + ])(req, res, next) + }, + serverAdapter.getRouter() + ) + + return router +} diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index 4fcefbafa..f8764868e 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -34,7 +34,7 @@ import { } from '@/modules/core/repositories/scheduledTasks' import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks' -import { Roles, TIME } from '@speckle/shared' +import { TIME } from '@speckle/shared' import { FileUploadDatabaseEvents } from '@/modules/fileuploads/domain/consts' import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router' import { nextGenFileImporterRouterFactory } from '@/modules/fileuploads/rest/nextGenRouter' @@ -45,11 +45,6 @@ import { fileImportQueues } from '@/modules/fileuploads/queues/fileimports' import { initializeEventListenersFactory } from '@/modules/fileuploads/events/eventListener' -import { createBullBoard } from 'bull-board' -import { BullMQAdapter } from 'bull-board/bullMQAdapter' -import { authMiddlewareCreator } from '@/modules/shared/middleware' -import { getRolesFactory } from '@/modules/shared/repositories/roles' -import { validateServerRoleBuilderFactory } from '@/modules/shared/authz' import { initializeMetrics, ObserveResult @@ -114,32 +109,8 @@ export const init: SpeckleModule['init'] = async ({ if (isInitial) { if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED) { const rhinoQueue = await initializeRhinoQueue() - const rhinoRouter = createBullBoard([new BullMQAdapter(rhinoQueue.queue)]).router - app.use( - '/api/admin/fileimport-jobs/rhino', - async (req, res, next) => { - await authMiddlewareCreator([ - validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({ - requiredRole: Roles.Server.Admin - }) - ])(req, res, next) - }, - rhinoRouter - ) - const ifcQueue = await initializeIfcQueue() - const ifcRouter = createBullBoard([new BullMQAdapter(ifcQueue.queue)]).router - app.use( - '/api/admin/fileimport-jobs/ifc', - async (req, res, next) => { - await authMiddlewareCreator([ - validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({ - requiredRole: Roles.Server.Admin - }) - ])(req, res, next) - }, - ifcRouter - ) + ;({ observeResult } = initializeMetrics({ registers: [metricsRegister], requestQueues: [rhinoQueue, ifcQueue] diff --git a/packages/server/modules/previews/index.ts b/packages/server/modules/previews/index.ts index 1f3695777..78721e181 100644 --- a/packages/server/modules/previews/index.ts +++ b/packages/server/modules/previews/index.ts @@ -2,19 +2,13 @@ import { moduleLogger, previewLogger as logger } from '@/observability/logging' import { consumePreviewResultFactory } from '@/modules/previews/resultListener' -import { db } from '@/db/knex' import { disablePreviews, getPreviewServiceRedisUrl, getRedisUrl, getServerOrigin } from '@/modules/shared/helpers/envHelper' -import { createBullBoard } from 'bull-board' -import { BullMQAdapter } from 'bull-board/bullMQAdapter' -import { authMiddlewareCreator } from '@/modules/shared/middleware' -import { ensureError, Roles, TIME } from '@speckle/shared' -import { validateServerRoleBuilderFactory } from '@/modules/shared/authz' -import { getRolesFactory } from '@/modules/shared/repositories/roles' +import { ensureError, TIME } from '@speckle/shared' import { previewRouterFactory } from '@/modules/previews/rest/router' import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { @@ -100,22 +94,6 @@ export const init: SpeckleModule['init'] = async ({ previewResponseQueue }) - const router = createBullBoard([ - new BullMQAdapter(previewRequestQueue), - new BullMQAdapter(previewResponseQueue) - ]).router - app.use( - '/api/admin/preview-jobs', - async (req, res, next) => { - await authMiddlewareCreator([ - validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({ - requiredRole: Roles.Server.Admin - }) - ])(req, res, next) - }, - router - ) - const previewRouter = previewRouterFactory({ previewRequestQueue, responseQueueName diff --git a/packages/server/package.json b/packages/server/package.json index 17d087928..7da6a243c 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -50,6 +50,7 @@ "@aws-sdk/client-s3": "^3.276.0", "@aws-sdk/lib-storage": "^3.100.0", "@aws-sdk/s3-request-presigner": "3.352.0", + "@bull-board/express": "^4.2.2", "@godaddy/terminus": "^4.9.0", "@graphql-tools/mock": "^9.0.4", "@graphql-tools/schema": "^10.0.6", @@ -140,7 +141,6 @@ }, "devDependencies": { "@apollo/rover": "^0.23.0", - "@bull-board/express": "^4.2.2", "@faker-js/faker": "^8.4.1", "@graphql-codegen/cli": "^5.0.5", "@graphql-codegen/typed-document-node": "^5.1.1", diff --git a/packages/shared/src/queue/config.ts b/packages/shared/src/queue/config.ts index b2f302934..ec736a2b0 100644 --- a/packages/shared/src/queue/config.ts +++ b/packages/shared/src/queue/config.ts @@ -2,12 +2,14 @@ import Bull from 'bull' import { Redis } from 'ioredis' import { isRedisReady } from '../redis/isRedisReady.js' -// we're caching this here, so that there is one client for the app lifecycle - type ClientCache = Record +// we're caching this here, so that there is one client for the app lifecycle const clientCache: ClientCache = {} +// so we can get all active queues for monitoring +const queueCache: Record = {} + export const initializeQueue = async ({ queueName, redisUrl, @@ -57,13 +59,23 @@ export const initializeQueue = async ({ } } } + const newQueue = new Bull(queueName, opts) - // bull does not check if redis is ready... - // - // logger.info('Checking Redis connection is ready...') + queueCache[queueName] = newQueue + + // When newQueue closed, remove from cache + newQueue.on('close', () => { + delete queueCache[queueName] + }) + newQueue.client.on('end', () => { + delete queueCache[queueName] + }) + if (!clientCache[redisUrl].client) throw new Error('Redis client not properly initialized') + await isRedisReady(clientCache[redisUrl].client) - // await isRedisReady(clientCache[redisUrl].subscriber) return await newQueue.isReady() } + +export const getActiveQueues = () => ({ ...queueCache })