feat(server): dev-only endpoint for monitoring bull queues (#5000)

* feat(server): dev-only endpoint for monitoring bull queues

* cr comments
This commit is contained in:
Kristaps Fabians Geikins
2025-06-30 16:12:45 +03:00
committed by GitHub
parent a6c473f682
commit 290fdb3e6d
6 changed files with 78 additions and 62 deletions
+5 -1
View File
@@ -44,6 +44,7 @@ import {
} from '@/modules/stats/repositories'
import { getServerTotalModelCountFactory } from '@/modules/core/services/branch/retrieval'
import { getServerTotalVersionCountFactory } from '@/modules/core/services/commit/retrieval'
import { bullMonitoringRouterFactory } from '@/modules/core/rest/monitoring'
let stopTestSubs: (() => void) | undefined = undefined
@@ -123,7 +124,7 @@ const coreModule: SpeckleModule<{
})()
}
},
async finalize() {
async finalize({ app }) {
// Update server profile in mp
await updateServerMixpanelProfileFactory({
getServerInfo: getCachedServerInfoFactory({ db }),
@@ -135,6 +136,9 @@ const coreModule: SpeckleModule<{
getServerTotalVersionCount: getServerTotalVersionCountFactory(),
logger: coreLogger
})()
// Run BullMQ monitor once the app is fully ready
app.use(bullMonitoringRouterFactory())
},
async shutdown() {
await shutdownResultListener()
@@ -0,0 +1,51 @@
import { getServerOrigin } from '@/modules/shared/helpers/envHelper'
import { Router } from 'express'
import { ExpressAdapter } from '@bull-board/express'
import { createBullBoard } from '@bull-board/api'
import { BullAdapter } from '@bull-board/api/bullAdapter'
import { getActiveQueues } from '@speckle/shared/queue'
import { moduleLogger } from '@/observability/logging'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { validateServerRoleBuilderFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { db } from '@/db/knex'
import { Roles } from '@speckle/shared'
/**
* Has to be invoked after all speckle modules are initialized, cause only then we have
* the full set of Bull queues registered.
*/
export const bullMonitoringRouterFactory = (): Router => {
const router = Router()
const relativeUrl = '/api/admin/bull-jobs'
const url = new URL(relativeUrl, getServerOrigin())
const queues = getActiveQueues()
moduleLogger.info(
`Initializing Bull monitoring UI with ${
Object.keys(queues).length
} queues at ${url.toString()}`
)
const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath(relativeUrl)
createBullBoard({
serverAdapter,
queues: Object.values(queues).map((q) => new BullAdapter(q))
})
router.use(
relativeUrl,
// Admin only
async (req, res, next) => {
await authMiddlewareCreator([
validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({
requiredRole: Roles.Server.Admin
})
])(req, res, next)
},
serverAdapter.getRouter()
)
return router
}
+2 -31
View File
@@ -34,7 +34,7 @@ import {
} from '@/modules/core/repositories/scheduledTasks'
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks'
import { Roles, TIME } from '@speckle/shared'
import { TIME } from '@speckle/shared'
import { FileUploadDatabaseEvents } from '@/modules/fileuploads/domain/consts'
import { fileuploadRouterFactory } from '@/modules/fileuploads/rest/router'
import { nextGenFileImporterRouterFactory } from '@/modules/fileuploads/rest/nextGenRouter'
@@ -45,11 +45,6 @@ import {
fileImportQueues
} from '@/modules/fileuploads/queues/fileimports'
import { initializeEventListenersFactory } from '@/modules/fileuploads/events/eventListener'
import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { validateServerRoleBuilderFactory } from '@/modules/shared/authz'
import {
initializeMetrics,
ObserveResult
@@ -114,32 +109,8 @@ export const init: SpeckleModule['init'] = async ({
if (isInitial) {
if (FF_NEXT_GEN_FILE_IMPORTER_ENABLED) {
const rhinoQueue = await initializeRhinoQueue()
const rhinoRouter = createBullBoard([new BullMQAdapter(rhinoQueue.queue)]).router
app.use(
'/api/admin/fileimport-jobs/rhino',
async (req, res, next) => {
await authMiddlewareCreator([
validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({
requiredRole: Roles.Server.Admin
})
])(req, res, next)
},
rhinoRouter
)
const ifcQueue = await initializeIfcQueue()
const ifcRouter = createBullBoard([new BullMQAdapter(ifcQueue.queue)]).router
app.use(
'/api/admin/fileimport-jobs/ifc',
async (req, res, next) => {
await authMiddlewareCreator([
validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({
requiredRole: Roles.Server.Admin
})
])(req, res, next)
},
ifcRouter
)
;({ observeResult } = initializeMetrics({
registers: [metricsRegister],
requestQueues: [rhinoQueue, ifcQueue]
+1 -23
View File
@@ -2,19 +2,13 @@
import { moduleLogger, previewLogger as logger } from '@/observability/logging'
import { consumePreviewResultFactory } from '@/modules/previews/resultListener'
import { db } from '@/db/knex'
import {
disablePreviews,
getPreviewServiceRedisUrl,
getRedisUrl,
getServerOrigin
} from '@/modules/shared/helpers/envHelper'
import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { ensureError, Roles, TIME } from '@speckle/shared'
import { validateServerRoleBuilderFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { ensureError, TIME } from '@speckle/shared'
import { previewRouterFactory } from '@/modules/previews/rest/router'
import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import {
@@ -100,22 +94,6 @@ export const init: SpeckleModule['init'] = async ({
previewResponseQueue
})
const router = createBullBoard([
new BullMQAdapter(previewRequestQueue),
new BullMQAdapter(previewResponseQueue)
]).router
app.use(
'/api/admin/preview-jobs',
async (req, res, next) => {
await authMiddlewareCreator([
validateServerRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({
requiredRole: Roles.Server.Admin
})
])(req, res, next)
},
router
)
const previewRouter = previewRouterFactory({
previewRequestQueue,
responseQueueName
+1 -1
View File
@@ -50,6 +50,7 @@
"@aws-sdk/client-s3": "^3.276.0",
"@aws-sdk/lib-storage": "^3.100.0",
"@aws-sdk/s3-request-presigner": "3.352.0",
"@bull-board/express": "^4.2.2",
"@godaddy/terminus": "^4.9.0",
"@graphql-tools/mock": "^9.0.4",
"@graphql-tools/schema": "^10.0.6",
@@ -140,7 +141,6 @@
},
"devDependencies": {
"@apollo/rover": "^0.23.0",
"@bull-board/express": "^4.2.2",
"@faker-js/faker": "^8.4.1",
"@graphql-codegen/cli": "^5.0.5",
"@graphql-codegen/typed-document-node": "^5.1.1",
+18 -6
View File
@@ -2,12 +2,14 @@ import Bull from 'bull'
import { Redis } from 'ioredis'
import { isRedisReady } from '../redis/isRedisReady.js'
// we're caching this here, so that there is one client for the app lifecycle
type ClientCache = Record<string, { client?: Redis; subscriber?: Redis }>
// we're caching this here, so that there is one client for the app lifecycle
const clientCache: ClientCache = {}
// so we can get all active queues for monitoring
const queueCache: Record<string, Bull.Queue> = {}
export const initializeQueue = async <T>({
queueName,
redisUrl,
@@ -57,13 +59,23 @@ export const initializeQueue = async <T>({
}
}
}
const newQueue = new Bull<T>(queueName, opts)
// bull does not check if redis is ready...
//
// logger.info('Checking Redis connection is ready...')
queueCache[queueName] = newQueue
// When newQueue closed, remove from cache
newQueue.on('close', () => {
delete queueCache[queueName]
})
newQueue.client.on('end', () => {
delete queueCache[queueName]
})
if (!clientCache[redisUrl].client)
throw new Error('Redis client not properly initialized')
await isRedisReady(clientCache[redisUrl].client)
// await isRedisReady(clientCache[redisUrl].subscriber)
return await newQueue.isReady()
}
export const getActiveQueues = () => ({ ...queueCache })