diff --git a/packages/fileimport-service/package.json b/packages/fileimport-service/package.json index 1223c9499..dc43e2eea 100644 --- a/packages/fileimport-service/package.json +++ b/packages/fileimport-service/package.json @@ -33,6 +33,7 @@ "dependencies": { "@speckle/shared": "workspace:^", "bcrypt": "^5.0.0", + "bull": "^4.16.5", "crypto": "^1.0.1", "crypto-random-string": "^3.2.0", "dotenv": "^16.4.5", @@ -47,7 +48,8 @@ "tarn": "^3.0.2", "undici": "^5.28.4", "valid-filename": "^3.1.0", - "web-ifc": "^0.0.36" + "web-ifc": "^0.0.36", + "znv": "^0.5.0" }, "devDependencies": { "@types/bcrypt": "^5.0.0", diff --git a/packages/preview-frontend/src/main.ts b/packages/preview-frontend/src/main.ts index b6b692f34..b9c831166 100644 --- a/packages/preview-frontend/src/main.ts +++ b/packages/preview-frontend/src/main.ts @@ -5,7 +5,7 @@ import { PreviewGenerator, PreviewPageResult, TakeScreenshot -} from '@speckle/shared/dist/esm/workers/previews/interface.js' +} from '@speckle/shared/workers/previews' import { Viewer, DefaultViewerParams, diff --git a/packages/preview-service/scripts/publishTask.ts b/packages/preview-service/scripts/publishTask.ts index dd1fedbd4..71ff29e41 100644 --- a/packages/preview-service/scripts/publishTask.ts +++ b/packages/preview-service/scripts/publishTask.ts @@ -1,7 +1,10 @@ -import Bull from 'bull' import { REDIS_URL } from '../src/config.js' +import { initializeQueue } from '@speckle/shared/queue' -const jobQueue = new Bull('preview-service-jobs', REDIS_URL) +const jobQueue = await initializeQueue({ + queueName: 'preview-service-jobs', + redisUrl: REDIS_URL +}) await jobQueue.add({ url: 'https://latest.speckle.systems/projects/8b94a55ee5/models/7f98c5b62e', diff --git a/packages/preview-service/src/jobProcessor.ts b/packages/preview-service/src/jobProcessor.ts index 3691bfe8f..85dc4052e 100644 --- a/packages/preview-service/src/jobProcessor.ts +++ b/packages/preview-service/src/jobProcessor.ts @@ -1,13 +1,12 @@ import { Page, Browser } from 'puppeteer' import type { Logger } from 'pino' -import type { PreviewGenerator } from '@speckle/shared/dist/esm/workers/previews/interface.js' import type { + PreviewGenerator, JobPayload, PreviewResultPayload -} from '@speckle/shared/dist/esm/workers/previews/job.js' - -import { AppState } from '@/const.js' +} from '@speckle/shared/workers/previews' +import { AppState } from '@speckle/shared/workers' import { TIME_MS } from '@speckle/shared' declare global { diff --git a/packages/preview-service/src/main.ts b/packages/preview-service/src/main.ts index a643dcfca..1392aae9c 100644 --- a/packages/preview-service/src/main.ts +++ b/packages/preview-service/src/main.ts @@ -2,11 +2,10 @@ import express from 'express' import puppeteer, { Browser } from 'puppeteer' import { createTerminus } from '@godaddy/terminus' import type { Logger } from 'pino' -import { Redis, type RedisOptions } from 'ioredis' -import Bull, { type QueueOptions } from 'bull' - -import { jobPayload } from '@speckle/shared/dist/esm/workers/previews/job.js' +import type Bull from 'bull' +import { JobPayload, PreviewResultPayload } from '@speckle/shared/workers/previews' +import { AppState } from '@speckle/shared/workers' import { REDIS_URL, HOST, @@ -19,10 +18,10 @@ import { } from '@/config.js' import { logger } from '@/logging.js' import { jobProcessor } from '@/jobProcessor.js' -import { AppState } from '@/const.js' import { initMetrics, initPrometheusRegistry } from '@/metrics.js' import { ensureError } from '@speckle/shared' -import { isRedisReady } from '@/utils.js' +import { initializeQueue } from '@speckle/shared/queue' +import { isRedisReady } from '@speckle/shared/redis' const app = express() const host = HOST @@ -35,40 +34,7 @@ let appState: AppState = AppState.STARTING // serve the preview-frontend app.use(express.static('public')) await initMetrics({ app, registry: initPrometheusRegistry() }) - -let client: Redis -let subscriber: Redis - -const opts: QueueOptions = { - // redisOpts here will contain at least a property of connectionName which will identify the queue based on its name - createClient(type: string, redisOpts: RedisOptions) { - switch (type) { - case 'client': - if (!client) { - client = new Redis(REDIS_URL, redisOpts) - } - return client - case 'subscriber': - if (!subscriber) { - subscriber = new Redis(REDIS_URL, { - ...redisOpts, - maxRetriesPerRequest: null, - enableReadyCheck: false - }) - } - return subscriber - case 'bclient': - return new Redis(REDIS_URL, { - ...redisOpts, - maxRetriesPerRequest: null, - enableReadyCheck: false - }) - default: - throw new Error('Unexpected connection type: ' + type) - } - } -} -let jobQueue: Bull.Queue | undefined = undefined +let jobQueue: Bull.Queue | undefined = undefined // store this callback, so on shutdown we can error the job let currentJob: { logger: Logger; done: Bull.DoneCallback } | undefined = undefined @@ -116,16 +82,10 @@ const server = app.listen(port, host, async () => { } try { - const newQueue = new Bull(JobQueueName, opts) - - logger.info('Checking Redis connection is ready...') - - // Bull's Queue.isReady() does not actually check the Redis connection - // see https://github.com/OptimalBits/bull/issues/1873#issuecomment-953581143 - await isRedisReady(newQueue.client) - logger.info('Redis is ready') - - jobQueue = await newQueue.isReady() + jobQueue = await initializeQueue({ + queueName: JobQueueName, + redisUrl: REDIS_URL + }) } catch (e) { const err = ensureError(e, 'Unknown error creating job queue') logger.error({ err }, 'Error creating job queue') @@ -154,21 +114,15 @@ const server = app.listen(port, host, async () => { try { currentJob = { done, logger: jobLogger } - const parseResult = jobPayload.safeParse(payload.data) - if (!parseResult.success) { - jobLogger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - { parseError: parseResult.error, payload: payload.data }, - 'Invalid job payload' - ) - return done(parseResult.error) - } - const job = parseResult.data + const job = payload.data jobLogger = jobLogger.child({ jobId: job.jobId, serverUrl: job.url }) - const resultsQueue = new Bull(job.responseQueue, opts) + const resultsQueue = await initializeQueue({ + queueName: job.responseQueue, + redisUrl: REDIS_URL + }) browser = await launchBrowser() const result = await jobProcessor({ diff --git a/packages/preview-service/tsconfig.json b/packages/preview-service/tsconfig.json index c6f230d93..100a327cf 100644 --- a/packages/preview-service/tsconfig.json +++ b/packages/preview-service/tsconfig.json @@ -31,7 +31,7 @@ /* Modules */ "module": "ES2022" /* Specify what module code is generated. */, // "rootDir": "./", /* Specify the root folder within your source files. */ - "moduleResolution": "node", + "moduleResolution": "bundler", // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ diff --git a/packages/server/healthchecks/redis.ts b/packages/server/healthchecks/redis.ts index a9af0e8d6..fb0670904 100644 --- a/packages/server/healthchecks/redis.ts +++ b/packages/server/healthchecks/redis.ts @@ -1,5 +1,5 @@ import type { CheckResponse, RedisCheck } from '@/healthchecks/types' -import { isRedisReady } from '@/modules/shared/redis/redis' +import { isRedisReady } from '@speckle/shared/dist/commonjs/redis/isRedisReady.js' export const isRedisAlive: RedisCheck = async (params): Promise => { const { client } = params diff --git a/packages/server/modules/cli/commands/activities/send.ts b/packages/server/modules/cli/commands/activities/send.ts index e06e71b72..3bca4371b 100644 --- a/packages/server/modules/cli/commands/activities/send.ts +++ b/packages/server/modules/cli/commands/activities/send.ts @@ -18,7 +18,7 @@ const command: CommandModule = { }) }, handler: async (argv) => { - initializeQueue() + await initializeQueue() const numberOfDays = argv.days as number const end = new Date() const start = new Date(end.getTime()) diff --git a/packages/server/modules/cli/commands/bull/monitor.ts b/packages/server/modules/cli/commands/bull/monitor.ts index 7501d2ebb..e4315cb79 100644 --- a/packages/server/modules/cli/commands/bull/monitor.ts +++ b/packages/server/modules/cli/commands/bull/monitor.ts @@ -26,11 +26,11 @@ const command: CommandModule = { const testQueueId = argv.testQueueId cliLogger.info('Initializing bull queues...') - const queues = [buildNotificationsQueue(NOTIFICATIONS_QUEUE)] + const queues = [await buildNotificationsQueue(NOTIFICATIONS_QUEUE)] if (testQueueId) { cliLogger.info('Also initializing queue %s...', testQueueId) - queues.push(buildNotificationsQueue(testQueueId)) + queues.push(await buildNotificationsQueue(testQueueId)) } cliLogger.info('Initializing monitor...') diff --git a/packages/server/modules/cli/commands/bull/test-push.ts b/packages/server/modules/cli/commands/bull/test-push.ts index 9e566275d..6caca5625 100644 --- a/packages/server/modules/cli/commands/bull/test-push.ts +++ b/packages/server/modules/cli/commands/bull/test-push.ts @@ -24,7 +24,7 @@ const command: CommandModule = { }) }, handler: async (argv) => { - initializeQueue() + await initializeQueue() // we don't want to submit a real mentions payload, this is for testing only await publishNotification(NotificationType.MentionedInComment, { diff --git a/packages/server/modules/multiregion/services/queue.ts b/packages/server/modules/multiregion/services/queue.ts index 3c2b8b9ae..cfdfb8189 100644 --- a/packages/server/modules/multiregion/services/queue.ts +++ b/packages/server/modules/multiregion/services/queue.ts @@ -3,7 +3,6 @@ import { logger } from '@/observability/logging' import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper' import cryptoRandomString from 'crypto-random-string' import { Optional, TIME_MS } from '@speckle/shared' -import { buildBaseQueueOptions } from '@/modules/shared/helpers/bullHelper' import { UninitializedResourceAccessError } from '@/modules/shared/errors' import { MultiRegionInvalidJobError, @@ -30,6 +29,7 @@ import { } from '@/modules/multiregion/repositories/projectRegion' import { updateProjectRegionKeyFactory } from '@/modules/multiregion/services/projectRegion' import { getGenericRedis } from '@/modules/shared/redis/redis' +import { initializeQueue as setupQueue } from '@speckle/shared/dist/commonjs/queue/index.js' import { getEventBus } from '@/modules/shared/services/eventBus' import { copyWorkspaceFactory, @@ -49,6 +49,7 @@ import { countProjectWebhooksFactory } from '@/modules/workspaces/repositories/projectRegions' import { withTransaction } from '@/modules/shared/helpers/dbHelper' +import { getRedisUrl } from '@/modules/shared/helpers/envHelper' const MULTIREGION_QUEUE_NAME = isTestEnv() ? `test:multiregion:${cryptoRandomString({ length: 5 })}` @@ -77,30 +78,7 @@ type MultiregionJob = let queue: Optional> -export const buildMultiregionQueue = (queueName: string) => - new Bull(queueName, { - ...buildBaseQueueOptions(), - ...(!isTestEnv() - ? { - limiter: { - max: 10, - duration: TIME_MS.second - } - } - : {}), - defaultJobOptions: { - attempts: 5, - timeout: 15 * TIME_MS.minute, - backoff: { - type: 'fixed', - delay: 5 * TIME_MS.minute - }, - removeOnComplete: isProdEnv(), - removeOnFail: false - } - }) - -export const getQueue = (): Bull.Queue => { +export const getQueue = (): Bull.Queue => { if (!queue) { throw new UninitializedResourceAccessError( 'Attempting to use uninitialized Bull queue' @@ -110,8 +88,31 @@ export const getQueue = (): Bull.Queue => { return queue } -export const initializeQueue = () => { - queue = buildMultiregionQueue(MULTIREGION_QUEUE_NAME) +export const initializeQueue = async () => { + queue = await setupQueue({ + queueName: MULTIREGION_QUEUE_NAME, + redisUrl: getRedisUrl(), + options: { + ...(!isTestEnv() + ? { + limiter: { + max: 10, + duration: TIME_MS.second + } + } + : {}), + defaultJobOptions: { + attempts: 5, + timeout: 15 * TIME_MS.minute, + backoff: { + type: 'fixed', + delay: 5 * TIME_MS.minute + }, + removeOnComplete: isProdEnv(), + removeOnFail: false + } + } + }) } /** diff --git a/packages/server/modules/notifications/index.ts b/packages/server/modules/notifications/index.ts index c66402a36..89f31a711 100644 --- a/packages/server/modules/notifications/index.ts +++ b/packages/server/modules/notifications/index.ts @@ -36,7 +36,7 @@ export async function initializeConsumption( registerNotificationHandlers(customHandlers || allHandlers) - initializeQueue() + await initializeQueue() if (shouldDisableNotificationsConsumption()) { moduleLogger.info('Skipping notification consumption...') diff --git a/packages/server/modules/notifications/services/queue.ts b/packages/server/modules/notifications/services/queue.ts index 31e0acc7d..ccec279f2 100644 --- a/packages/server/modules/notifications/services/queue.ts +++ b/packages/server/modules/notifications/services/queue.ts @@ -12,9 +12,9 @@ import { NotificationType, NotificationTypeHandlers } from '@/modules/notifications/helpers/types' -import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper' +import { getRedisUrl, isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper' import Bull from 'bull' -import { buildBaseQueueOptions } from '@/modules/shared/helpers/bullHelper' +import { initializeQueue as setupQueue } from '@speckle/shared/dist/commonjs/queue/index.js' import cryptoRandomString from 'crypto-random-string' import { logger, notificationsLogger, Observability } from '@/observability/logging' import { ensureErrorOrWrapAsCause } from '@/modules/shared/errors/ensureError' @@ -47,22 +47,25 @@ if (isTestEnv()) { let queue: Optional -export const buildNotificationsQueue = (queueName: string) => - new Bull(queueName, { - ...buildBaseQueueOptions(), - ...(!isTestEnv() - ? { - limiter: { - max: 10, - duration: TIME_MS.second +export const buildNotificationsQueue = async (queueName: string) => + await setupQueue({ + queueName, + redisUrl: getRedisUrl(), + options: { + ...(!isTestEnv() + ? { + limiter: { + max: 10, + duration: TIME_MS.second + } } - } - : {}), - defaultJobOptions: { - attempts: 1, - timeout: 10 * TIME_MS.second, - removeOnComplete: isProdEnv(), - removeOnFail: isProdEnv() + : {}), + defaultJobOptions: { + attempts: 1, + timeout: 10 * TIME_MS.second, + removeOnComplete: isProdEnv(), + removeOnFail: isProdEnv() + } } }) @@ -82,8 +85,8 @@ export function getQueue(): Bull.Queue { /** * Initialize notifications queue */ -export function initializeQueue() { - queue = buildNotificationsQueue(NOTIFICATIONS_QUEUE) +export async function initializeQueue() { + queue = await buildNotificationsQueue(NOTIFICATIONS_QUEUE) } /** diff --git a/packages/server/modules/previews/index.ts b/packages/server/modules/previews/index.ts index d3d290f6f..b8b2144b1 100644 --- a/packages/server/modules/previews/index.ts +++ b/packages/server/modules/previews/index.ts @@ -9,8 +9,6 @@ import { getRedisUrl, getServerOrigin } from '@/modules/shared/helpers/envHelper' -import Bull, { type QueueOptions } from 'bull' -import Redis, { type RedisOptions } from 'ioredis' import { createBullBoard } from 'bull-board' import { BullMQAdapter } from 'bull-board/bullMQAdapter' import { authMiddlewareCreator } from '@/modules/shared/middleware' @@ -19,7 +17,11 @@ import { validateServerRoleBuilderFactory } from '@/modules/shared/authz' import { getRolesFactory } from '@/modules/shared/repositories/roles' import { previewRouterFactory } from '@/modules/previews/rest/router' import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper' -import { previewResultPayload } from '@speckle/shared/dist/commonjs/workers/previews/job.js' +import { + JobPayload, + PreviewResultPayload, + previewResultPayload +} from '@speckle/shared/dist/commonjs/workers/previews/job.js' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { storePreviewFactory, @@ -31,59 +33,32 @@ import { PreviewJobDurationStep } from '@/modules/previews/observability/metrics' import { addRequestQueueListeners } from '@/modules/previews/queues/previews' -import { isRedisReady } from '@/modules/shared/redis/redis' +import { initializeQueue } from '@speckle/shared/dist/commonjs/queue/index.js' +import type Bull from 'bull' const JobQueueName = 'preview-service-jobs' const ResponseQueueNamePrefix = 'preview-service-results' const getPreviewQueues = async (params: { responseQueueName: string }) => { const { responseQueueName } = params - let client: Redis - let subscriber: Redis const redisUrl = getPreviewServiceRedisUrl() ?? getRedisUrl() - const opts: QueueOptions = { - // redisOpts here will contain at least a property of connectionName which will identify the queue based on its name - createClient(type: string, redisOpts: RedisOptions) { - switch (type) { - case 'client': - if (!client) { - client = new Redis(redisUrl, redisOpts) - } - return client - case 'subscriber': - if (!subscriber) { - subscriber = new Redis(redisUrl, { - ...redisOpts, - maxRetriesPerRequest: null, - enableReadyCheck: false - }) - } - return subscriber - case 'bclient': - return new Redis(redisUrl, { - ...redisOpts, - maxRetriesPerRequest: null, - enableReadyCheck: false - }) - default: - throw new Error('Unexpected connection type: ' + type) - } - } - } - // previews are requested on this queue - const previewRequestQueue = new Bull(JobQueueName, opts) - await isRedisReady(previewRequestQueue.client) + const previewRequestQueue = await initializeQueue({ + queueName: JobQueueName, + redisUrl + }) addRequestQueueListeners({ logger, previewRequestQueue }) // rendered previews are sent back on this queue - const previewResponseQueue = new Bull(responseQueueName, opts) + const previewResponseQueue = await initializeQueue({ + queueName: responseQueueName, + redisUrl + }) - await isRedisReady(previewResponseQueue.client) return { previewRequestQueue, previewResponseQueue } } @@ -103,8 +78,8 @@ export const init: SpeckleModule['init'] = async ({ new URL(getServerOrigin()).hostname }` - let previewRequestQueue: Bull.Queue - let previewResponseQueue: Bull.Queue + let previewRequestQueue: Bull.Queue + let previewResponseQueue: Bull.Queue try { ;({ previewRequestQueue, previewResponseQueue } = await getPreviewQueues({ diff --git a/packages/server/modules/previews/queues/previews.ts b/packages/server/modules/previews/queues/previews.ts index 9b5a4eacb..2ed5a2d52 100644 --- a/packages/server/modules/previews/queues/previews.ts +++ b/packages/server/modules/previews/queues/previews.ts @@ -5,6 +5,7 @@ import type { EventEmitter } from 'stream' import { upsertObjectPreviewFactory } from '@/modules/previews/repository/previews' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { PreviewStatus } from '@/modules/previews/domain/consts' +import { JobPayload } from '@speckle/shared/dist/commonjs/workers/previews' export const requestObjectPreviewFactory = ({ @@ -12,7 +13,7 @@ export const requestObjectPreviewFactory = queue }: { responseQueue: string - queue: Queue + queue: Queue }): RequestObjectPreview => async ({ jobId, token, url }) => { const payload = { jobId, token, url, responseQueue } diff --git a/packages/server/modules/shared/helpers/bullHelper.ts b/packages/server/modules/shared/helpers/bullHelper.ts deleted file mode 100644 index bf419121c..000000000 --- a/packages/server/modules/shared/helpers/bullHelper.ts +++ /dev/null @@ -1,20 +0,0 @@ -import Bull from 'bull' -import { getRedisUrl } from '@/modules/shared/helpers/envHelper' -import { createRedisClient } from '@/modules/shared/redis/redis' - -export function buildBaseQueueOptions(): Bull.QueueOptions { - return { - createClient: (type) => { - const client = createRedisClient(getRedisUrl(), { - ...(['bclient', 'subscriber'].includes(type) - ? { - enableReadyCheck: false, - maxRetriesPerRequest: null - } - : {}) - }) - - return client - } - } -} diff --git a/packages/server/modules/shared/redis/redis.ts b/packages/server/modules/shared/redis/redis.ts index 293f54062..2a8022309 100644 --- a/packages/server/modules/shared/redis/redis.ts +++ b/packages/server/modules/shared/redis/redis.ts @@ -5,7 +5,6 @@ import { MisconfiguredEnvironmentError } from '@/modules/shared/errors' import { getRedisUrl } from '@/modules/shared/helpers/envHelper' -import { ensureError } from '@speckle/shared' export function createRedisClient(redisUrl: string, redisOptions: RedisOptions): Redis { let redisClient: Redis @@ -35,36 +34,3 @@ export const getGenericRedis = (): Redis => { if (!redisClient) redisClient = createRedisClient(getRedisUrl(), {}) return redisClient } - -export const isRedisReady = (client: Redis) => { - // MIT Licensed: https://github.com/OptimalBits/bull/blob/develop/LICENSE.md - // Reference: https://github.com/OptimalBits/bull/blob/develop/lib/utils.js - return new Promise((resolve, reject) => { - if (client.status === 'ready') { - resolve() - } else { - function handleReady() { - client.removeListener('end', handleEnd) - client.removeListener('error', handleError) - resolve() - } - - function handleError(e: unknown) { - const err = ensureError(e, 'Unknown error in Redis client') - client.removeListener('ready', handleReady) - client.removeListener('error', handleError) - reject(err) - } - - function handleEnd() { - client.removeListener('ready', handleReady) - client.removeListener('error', handleError) - reject(new Error('Redis connection ended')) - } - - client.once('ready', handleReady) - client.on('error', handleError) - client.once('end', handleEnd) - } - }) -} diff --git a/packages/shared/package.json b/packages/shared/package.json index 771e78a35..407b4bdfd 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -46,6 +46,7 @@ }, "peerDependencies": { "@tiptap/core": "^2.0.0-beta.176", + "bull": "*", "knex": "*", "mixpanel": "^0.17.0", "pino": "^8.7.0", @@ -64,6 +65,7 @@ "@typescript-eslint/parser": "^7.12.0", "@vitest/coverage-v8": "^3.0.9", "@vitest/ui": "^3.0.9", + "bull": "^4.16.5", "crypto-random-string": "^5.0.0", "eslint": "^9.4.0", "eslint-config-prettier": "^9.1.0", @@ -87,6 +89,11 @@ "./environment": "./src/environment/index.ts", "./observability": "./src/observability/index.ts", "./authz": "./src/authz/index.ts", + "./queue": "./src/queue/index.ts", + "./redis": "./src/redis/index.ts", + "./workers": "./src/workers/index.ts", + "./workers/previews": "./src/workers/previews/index.ts", + "./workers/fileimport": "./src/workers/fileimport/index.ts", "./dist/*": "./dist/*" }, "exclude": [ @@ -143,6 +150,56 @@ "default": "./dist/commonjs/authz/index.js" } }, + "./queue": { + "import": { + "types": "./dist/esm/queue/index.d.ts", + "default": "./dist/esm/queue/index.js" + }, + "require": { + "types": "./dist/commonjs/queue/index.d.ts", + "default": "./dist/commonjs/queue/index.js" + } + }, + "./redis": { + "import": { + "types": "./dist/esm/redis/index.d.ts", + "default": "./dist/esm/redis/index.js" + }, + "require": { + "types": "./dist/commonjs/redis/index.d.ts", + "default": "./dist/commonjs/redis/index.js" + } + }, + "./workers": { + "import": { + "types": "./dist/esm/workers/index.d.ts", + "default": "./dist/esm/workers/index.js" + }, + "require": { + "types": "./dist/commonjs/workers/index.d.ts", + "default": "./dist/commonjs/workers/index.js" + } + }, + "./workers/previews": { + "import": { + "types": "./dist/esm/workers/previews/index.d.ts", + "default": "./dist/esm/workers/previews/index.js" + }, + "require": { + "types": "./dist/commonjs/workers/previews/index.d.ts", + "default": "./dist/commonjs/workers/previews/index.js" + } + }, + "./workers/fileimport": { + "import": { + "types": "./dist/esm/workers/fileimport/index.d.ts", + "default": "./dist/esm/workers/fileimport/index.js" + }, + "require": { + "types": "./dist/commonjs/workers/fileimport/index.d.ts", + "default": "./dist/commonjs/workers/fileimport/index.js" + } + }, "./dist/*": "./dist/*" } } diff --git a/packages/shared/src/queue/config.ts b/packages/shared/src/queue/config.ts new file mode 100644 index 000000000..b2f302934 --- /dev/null +++ b/packages/shared/src/queue/config.ts @@ -0,0 +1,69 @@ +import Bull from 'bull' +import { Redis } from 'ioredis' +import { isRedisReady } from '../redis/isRedisReady.js' + +// we're caching this here, so that there is one client for the app lifecycle + +type ClientCache = Record + +const clientCache: ClientCache = {} + +export const initializeQueue = async ({ + queueName, + redisUrl, + options +}: { + queueName: string + redisUrl: string + options?: Partial +}): Promise> => { + if (!(redisUrl in clientCache)) clientCache[redisUrl] = {} + const opts: Bull.QueueOptions = { + ...options, + // redisOpts here will contain at least a property of connectionName which will identify the queue based on its name + createClient(type, redisOpts) { + switch (type) { + case 'client': + if (redisUrl in clientCache && clientCache[redisUrl].client !== undefined) { + return clientCache[redisUrl].client + } else { + const client = new Redis(redisUrl, redisOpts ?? {}) + clientCache[redisUrl].client = client + return client + } + case 'subscriber': + if ( + redisUrl in clientCache && + clientCache[redisUrl].subscriber !== undefined + ) { + return clientCache[redisUrl].subscriber + } else { + const subscriber = new Redis(redisUrl, { + ...redisOpts, + maxRetriesPerRequest: null, + enableReadyCheck: false + }) + clientCache[redisUrl].subscriber = subscriber + return subscriber + } + case 'bclient': + return new Redis(redisUrl, { + ...redisOpts, + maxRetriesPerRequest: null, + enableReadyCheck: false + }) + default: + throw new Error(`Unexpected connection type: ${type}`) + } + } + } + const newQueue = new Bull(queueName, opts) + // bull does not check if redis is ready... + // + // logger.info('Checking Redis connection is ready...') + if (!clientCache[redisUrl].client) + throw new Error('Redis client not properly initialized') + await isRedisReady(clientCache[redisUrl].client) + // await isRedisReady(clientCache[redisUrl].subscriber) + return await newQueue.isReady() +} diff --git a/packages/shared/src/queue/index.ts b/packages/shared/src/queue/index.ts new file mode 100644 index 000000000..3f1ecb4e6 --- /dev/null +++ b/packages/shared/src/queue/index.ts @@ -0,0 +1 @@ +export * from './config.js' diff --git a/packages/shared/src/redis/index.ts b/packages/shared/src/redis/index.ts new file mode 100644 index 000000000..d1b99e028 --- /dev/null +++ b/packages/shared/src/redis/index.ts @@ -0,0 +1 @@ +export * from './isRedisReady.js' diff --git a/packages/preview-service/src/utils.ts b/packages/shared/src/redis/isRedisReady.ts similarity index 91% rename from packages/preview-service/src/utils.ts rename to packages/shared/src/redis/isRedisReady.ts index a0d6401fb..4415dee20 100644 --- a/packages/preview-service/src/utils.ts +++ b/packages/shared/src/redis/isRedisReady.ts @@ -1,5 +1,5 @@ -import { ensureError } from '@speckle/shared' -import { Redis, type RedisOptions } from 'ioredis' +import { Redis } from 'ioredis' +import { ensureError } from '../core/helpers/error.js' // MIT Licensed: https://github.com/OptimalBits/bull/blob/develop/LICENSE.md // Reference: https://github.com/OptimalBits/bull/blob/develop/lib/utils.js diff --git a/packages/shared/src/workers/fileimport/index.ts b/packages/shared/src/workers/fileimport/index.ts new file mode 100644 index 000000000..9f1b56890 --- /dev/null +++ b/packages/shared/src/workers/fileimport/index.ts @@ -0,0 +1 @@ +export * from './job.js' diff --git a/packages/shared/src/workers/index.ts b/packages/shared/src/workers/index.ts new file mode 100644 index 000000000..f27671049 --- /dev/null +++ b/packages/shared/src/workers/index.ts @@ -0,0 +1 @@ +export * from './state.js' diff --git a/packages/shared/src/workers/previews/index.ts b/packages/shared/src/workers/previews/index.ts new file mode 100644 index 000000000..1bdd865d3 --- /dev/null +++ b/packages/shared/src/workers/previews/index.ts @@ -0,0 +1,2 @@ +export * from './interface.js' +export * from './job.js' diff --git a/packages/preview-service/src/const.ts b/packages/shared/src/workers/state.ts similarity index 100% rename from packages/preview-service/src/const.ts rename to packages/shared/src/workers/state.ts diff --git a/yarn.lock b/yarn.lock index 9ef00974b..b17ffdbb1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -16480,6 +16480,7 @@ __metadata: "@types/node": "npm:^18.19.38" "@vitest/coverage-istanbul": "npm:^1.6.0" bcrypt: "npm:^5.0.0" + bull: "npm:^4.16.5" concurrently: "npm:^8.2.2" crypto: "npm:^1.0.1" crypto-random-string: "npm:^3.2.0" @@ -16505,6 +16506,7 @@ __metadata: valid-filename: "npm:^3.1.0" vitest: "npm:^1.6.0" web-ifc: "npm:^0.0.36" + znv: "npm:^0.5.0" languageName: unknown linkType: soft @@ -16985,6 +16987,7 @@ __metadata: "@typescript-eslint/parser": "npm:^7.12.0" "@vitest/coverage-v8": "npm:^3.0.9" "@vitest/ui": "npm:^3.0.9" + bull: "npm:^4.16.5" crypto-random-string: "npm:^5.0.0" dayjs: "npm:^1.11.13" eslint: "npm:^9.4.0" @@ -17006,6 +17009,7 @@ __metadata: zod: "npm:^3.22.4" peerDependencies: "@tiptap/core": ^2.0.0-beta.176 + bull: "*" knex: "*" mixpanel: ^0.17.0 pino: ^8.7.0 @@ -24810,6 +24814,21 @@ __metadata: languageName: node linkType: hard +"bull@npm:^4.16.5": + version: 4.16.5 + resolution: "bull@npm:4.16.5" + dependencies: + cron-parser: "npm:^4.9.0" + get-port: "npm:^5.1.1" + ioredis: "npm:^5.3.2" + lodash: "npm:^4.17.21" + msgpackr: "npm:^1.11.2" + semver: "npm:^7.5.2" + uuid: "npm:^8.3.0" + checksum: 10/3250486db79a99d3cf77f078694b134a4ae66e930e070d05dd099d7f975ab4d70163858641ad1873617ae0b9c0aab0ca5da6cb69e688b9ab91a779997b3b51e3 + languageName: node + linkType: hard + "bundle-name@npm:^4.1.0": version: 4.1.0 resolution: "bundle-name@npm:4.1.0" @@ -26969,6 +26988,15 @@ __metadata: languageName: node linkType: hard +"cron-parser@npm:^4.9.0": + version: 4.9.0 + resolution: "cron-parser@npm:4.9.0" + dependencies: + luxon: "npm:^3.2.1" + checksum: 10/ffca5e532a5ee0923412ee6e4c7f9bbceacc6ddf8810c16d3e9fb4fe5ec7e2de1b6896d7956f304bb6bc96b0ce37ad7e3935304179d52951c18d84107184faa7 + languageName: node + linkType: hard + "croner@npm:^9.0.0": version: 9.0.0 resolution: "croner@npm:9.0.0" @@ -38186,6 +38214,13 @@ __metadata: languageName: node linkType: hard +"luxon@npm:^3.2.1": + version: 3.6.1 + resolution: "luxon@npm:3.6.1" + checksum: 10/35aad425607708c87af110a52c949190bc35b987770079ec8007ef2365cd29639413db3360d2883777aa01cb3ca5bdb37f42ee3e8e5a0dd277fe22e90cc8a786 + languageName: node + linkType: hard + "lz-string@npm:^1.4.4": version: 1.4.4 resolution: "lz-string@npm:1.4.4" @@ -54323,6 +54358,15 @@ __metadata: languageName: node linkType: hard +"znv@npm:^0.5.0": + version: 0.5.0 + resolution: "znv@npm:0.5.0" + peerDependencies: + zod: ^3.24.2 + checksum: 10/4954b21e2856beca67da60d2f647671381284d5434908d69656ccd93c3e9291149f9b6ac1a8015d0b48604c14916241717f85b5a8c3c41d64423c8baebbf9a18 + languageName: node + linkType: hard + "zod-express@npm:^0.0.8": version: 0.0.8 resolution: "zod-express@npm:0.0.8"