refactor(shared): unified queue handling (#4691)

* feat(shared): unified queue initialization in shared

* feat(queues): use the new queue creation everywhere

* chore(shared): move to redis module

* chore(shared): fix export maps

* chore(fileimport): add deps properly

* fix(shared): import fix

* fix(everything): moear imports

* fix(server): cjs imports
This commit is contained in:
Gergő Jedlicska
2025-05-08 16:58:43 +02:00
committed by GitHub
parent a26a5a90a1
commit 2fdcf1bd1d
27 changed files with 281 additions and 221 deletions
@@ -18,7 +18,7 @@ const command: CommandModule = {
})
},
handler: async (argv) => {
initializeQueue()
await initializeQueue()
const numberOfDays = argv.days as number
const end = new Date()
const start = new Date(end.getTime())
@@ -26,11 +26,11 @@ const command: CommandModule<unknown, { testQueueId: string }> = {
const testQueueId = argv.testQueueId
cliLogger.info('Initializing bull queues...')
const queues = [buildNotificationsQueue(NOTIFICATIONS_QUEUE)]
const queues = [await buildNotificationsQueue(NOTIFICATIONS_QUEUE)]
if (testQueueId) {
cliLogger.info('Also initializing queue %s...', testQueueId)
queues.push(buildNotificationsQueue(testQueueId))
queues.push(await buildNotificationsQueue(testQueueId))
}
cliLogger.info('Initializing monitor...')
@@ -24,7 +24,7 @@ const command: CommandModule = {
})
},
handler: async (argv) => {
initializeQueue()
await initializeQueue()
// we don't want to submit a real mentions payload, this is for testing only
await publishNotification(NotificationType.MentionedInComment, {
@@ -3,7 +3,6 @@ import { logger } from '@/observability/logging'
import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import cryptoRandomString from 'crypto-random-string'
import { Optional, TIME_MS } from '@speckle/shared'
import { buildBaseQueueOptions } from '@/modules/shared/helpers/bullHelper'
import { UninitializedResourceAccessError } from '@/modules/shared/errors'
import {
MultiRegionInvalidJobError,
@@ -30,6 +29,7 @@ import {
} from '@/modules/multiregion/repositories/projectRegion'
import { updateProjectRegionKeyFactory } from '@/modules/multiregion/services/projectRegion'
import { getGenericRedis } from '@/modules/shared/redis/redis'
import { initializeQueue as setupQueue } from '@speckle/shared/dist/commonjs/queue/index.js'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
copyWorkspaceFactory,
@@ -49,6 +49,7 @@ import {
countProjectWebhooksFactory
} from '@/modules/workspaces/repositories/projectRegions'
import { withTransaction } from '@/modules/shared/helpers/dbHelper'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
const MULTIREGION_QUEUE_NAME = isTestEnv()
? `test:multiregion:${cryptoRandomString({ length: 5 })}`
@@ -77,30 +78,7 @@ type MultiregionJob =
let queue: Optional<Bull.Queue<MultiregionJob>>
export const buildMultiregionQueue = (queueName: string) =>
new Bull(queueName, {
...buildBaseQueueOptions(),
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
}
}
: {}),
defaultJobOptions: {
attempts: 5,
timeout: 15 * TIME_MS.minute,
backoff: {
type: 'fixed',
delay: 5 * TIME_MS.minute
},
removeOnComplete: isProdEnv(),
removeOnFail: false
}
})
export const getQueue = (): Bull.Queue => {
export const getQueue = (): Bull.Queue<MultiregionJob> => {
if (!queue) {
throw new UninitializedResourceAccessError(
'Attempting to use uninitialized Bull queue'
@@ -110,8 +88,31 @@ export const getQueue = (): Bull.Queue => {
return queue
}
export const initializeQueue = () => {
queue = buildMultiregionQueue(MULTIREGION_QUEUE_NAME)
export const initializeQueue = async () => {
queue = await setupQueue({
queueName: MULTIREGION_QUEUE_NAME,
redisUrl: getRedisUrl(),
options: {
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
}
}
: {}),
defaultJobOptions: {
attempts: 5,
timeout: 15 * TIME_MS.minute,
backoff: {
type: 'fixed',
delay: 5 * TIME_MS.minute
},
removeOnComplete: isProdEnv(),
removeOnFail: false
}
}
})
}
/**
@@ -36,7 +36,7 @@ export async function initializeConsumption(
registerNotificationHandlers(customHandlers || allHandlers)
initializeQueue()
await initializeQueue()
if (shouldDisableNotificationsConsumption()) {
moduleLogger.info('Skipping notification consumption...')
@@ -12,9 +12,9 @@ import {
NotificationType,
NotificationTypeHandlers
} from '@/modules/notifications/helpers/types'
import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import { getRedisUrl, isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import Bull from 'bull'
import { buildBaseQueueOptions } from '@/modules/shared/helpers/bullHelper'
import { initializeQueue as setupQueue } from '@speckle/shared/dist/commonjs/queue/index.js'
import cryptoRandomString from 'crypto-random-string'
import { logger, notificationsLogger, Observability } from '@/observability/logging'
import { ensureErrorOrWrapAsCause } from '@/modules/shared/errors/ensureError'
@@ -47,22 +47,25 @@ if (isTestEnv()) {
let queue: Optional<Bull.Queue>
export const buildNotificationsQueue = (queueName: string) =>
new Bull(queueName, {
...buildBaseQueueOptions(),
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
export const buildNotificationsQueue = async (queueName: string) =>
await setupQueue({
queueName,
redisUrl: getRedisUrl(),
options: {
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
}
}
}
: {}),
defaultJobOptions: {
attempts: 1,
timeout: 10 * TIME_MS.second,
removeOnComplete: isProdEnv(),
removeOnFail: isProdEnv()
: {}),
defaultJobOptions: {
attempts: 1,
timeout: 10 * TIME_MS.second,
removeOnComplete: isProdEnv(),
removeOnFail: isProdEnv()
}
}
})
@@ -82,8 +85,8 @@ export function getQueue(): Bull.Queue {
/**
* Initialize notifications queue
*/
export function initializeQueue() {
queue = buildNotificationsQueue(NOTIFICATIONS_QUEUE)
export async function initializeQueue() {
queue = await buildNotificationsQueue(NOTIFICATIONS_QUEUE)
}
/**
+17 -42
View File
@@ -9,8 +9,6 @@ import {
getRedisUrl,
getServerOrigin
} from '@/modules/shared/helpers/envHelper'
import Bull, { type QueueOptions } from 'bull'
import Redis, { type RedisOptions } from 'ioredis'
import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
@@ -19,7 +17,11 @@ import { validateServerRoleBuilderFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { previewRouterFactory } from '@/modules/previews/rest/router'
import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { previewResultPayload } from '@speckle/shared/dist/commonjs/workers/previews/job.js'
import {
JobPayload,
PreviewResultPayload,
previewResultPayload
} from '@speckle/shared/dist/commonjs/workers/previews/job.js'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
storePreviewFactory,
@@ -31,59 +33,32 @@ import {
PreviewJobDurationStep
} from '@/modules/previews/observability/metrics'
import { addRequestQueueListeners } from '@/modules/previews/queues/previews'
import { isRedisReady } from '@/modules/shared/redis/redis'
import { initializeQueue } from '@speckle/shared/dist/commonjs/queue/index.js'
import type Bull from 'bull'
const JobQueueName = 'preview-service-jobs'
const ResponseQueueNamePrefix = 'preview-service-results'
const getPreviewQueues = async (params: { responseQueueName: string }) => {
const { responseQueueName } = params
let client: Redis
let subscriber: Redis
const redisUrl = getPreviewServiceRedisUrl() ?? getRedisUrl()
const opts: QueueOptions = {
// redisOpts here will contain at least a property of connectionName which will identify the queue based on its name
createClient(type: string, redisOpts: RedisOptions) {
switch (type) {
case 'client':
if (!client) {
client = new Redis(redisUrl, redisOpts)
}
return client
case 'subscriber':
if (!subscriber) {
subscriber = new Redis(redisUrl, {
...redisOpts,
maxRetriesPerRequest: null,
enableReadyCheck: false
})
}
return subscriber
case 'bclient':
return new Redis(redisUrl, {
...redisOpts,
maxRetriesPerRequest: null,
enableReadyCheck: false
})
default:
throw new Error('Unexpected connection type: ' + type)
}
}
}
// previews are requested on this queue
const previewRequestQueue = new Bull(JobQueueName, opts)
await isRedisReady(previewRequestQueue.client)
const previewRequestQueue = await initializeQueue<JobPayload>({
queueName: JobQueueName,
redisUrl
})
addRequestQueueListeners({
logger,
previewRequestQueue
})
// rendered previews are sent back on this queue
const previewResponseQueue = new Bull(responseQueueName, opts)
const previewResponseQueue = await initializeQueue<PreviewResultPayload>({
queueName: responseQueueName,
redisUrl
})
await isRedisReady(previewResponseQueue.client)
return { previewRequestQueue, previewResponseQueue }
}
@@ -103,8 +78,8 @@ export const init: SpeckleModule['init'] = async ({
new URL(getServerOrigin()).hostname
}`
let previewRequestQueue: Bull.Queue
let previewResponseQueue: Bull.Queue
let previewRequestQueue: Bull.Queue<JobPayload>
let previewResponseQueue: Bull.Queue<PreviewResultPayload>
try {
;({ previewRequestQueue, previewResponseQueue } = await getPreviewQueues({
@@ -5,6 +5,7 @@ import type { EventEmitter } from 'stream'
import { upsertObjectPreviewFactory } from '@/modules/previews/repository/previews'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { PreviewStatus } from '@/modules/previews/domain/consts'
import { JobPayload } from '@speckle/shared/dist/commonjs/workers/previews'
export const requestObjectPreviewFactory =
({
@@ -12,7 +13,7 @@ export const requestObjectPreviewFactory =
queue
}: {
responseQueue: string
queue: Queue
queue: Queue<JobPayload>
}): RequestObjectPreview =>
async ({ jobId, token, url }) => {
const payload = { jobId, token, url, responseQueue }
@@ -1,20 +0,0 @@
import Bull from 'bull'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import { createRedisClient } from '@/modules/shared/redis/redis'
export function buildBaseQueueOptions(): Bull.QueueOptions {
return {
createClient: (type) => {
const client = createRedisClient(getRedisUrl(), {
...(['bclient', 'subscriber'].includes(type)
? {
enableReadyCheck: false,
maxRetriesPerRequest: null
}
: {})
})
return client
}
}
}
@@ -5,7 +5,6 @@ import {
MisconfiguredEnvironmentError
} from '@/modules/shared/errors'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import { ensureError } from '@speckle/shared'
export function createRedisClient(redisUrl: string, redisOptions: RedisOptions): Redis {
let redisClient: Redis
@@ -35,36 +34,3 @@ export const getGenericRedis = (): Redis => {
if (!redisClient) redisClient = createRedisClient(getRedisUrl(), {})
return redisClient
}
export const isRedisReady = (client: Redis) => {
// MIT Licensed: https://github.com/OptimalBits/bull/blob/develop/LICENSE.md
// Reference: https://github.com/OptimalBits/bull/blob/develop/lib/utils.js
return new Promise<void>((resolve, reject) => {
if (client.status === 'ready') {
resolve()
} else {
function handleReady() {
client.removeListener('end', handleEnd)
client.removeListener('error', handleError)
resolve()
}
function handleError(e: unknown) {
const err = ensureError(e, 'Unknown error in Redis client')
client.removeListener('ready', handleReady)
client.removeListener('error', handleError)
reject(err)
}
function handleEnd() {
client.removeListener('ready', handleReady)
client.removeListener('error', handleError)
reject(new Error('Redis connection ended'))
}
client.once('ready', handleReady)
client.on('error', handleError)
client.once('end', handleEnd)
}
})
}