diff --git a/packages/server/modules/previews/clients/bull.ts b/packages/server/modules/previews/clients/bull.ts new file mode 100644 index 000000000..e75e9fc6d --- /dev/null +++ b/packages/server/modules/previews/clients/bull.ts @@ -0,0 +1,103 @@ +import { Queue, type Job } from 'bull' +import type { EventEmitter } from 'stream' +import { initializeQueue } from '@speckle/shared/queue' +import { JobPayload, PreviewResultPayload } from '@speckle/shared/workers/previews' +import { + DelayBetweenPreviewRetriesMinutes, + NumberOfPreviewRetries +} from '@/modules/previews/domain/consts' +import { TIME, TIME_MS } from '@speckle/shared' +import { getPreviewServiceTimeoutMilliseconds } from '@/modules/shared/helpers/envHelper' + +interface QueueEventEmitter extends EventEmitter {} + +const defaultJobOptions = { + attempts: NumberOfPreviewRetries, + timeout: + NumberOfPreviewRetries * + (getPreviewServiceTimeoutMilliseconds() + + DelayBetweenPreviewRetriesMinutes * TIME_MS.minute), + backoff: { + type: 'fixed', + delay: DelayBetweenPreviewRetriesMinutes * TIME_MS.minute + }, + removeOnComplete: { + // retain completed jobs for 1 day or until it is the 100th completed job being retained, whichever comes first + age: 1 * TIME.day, + count: 100 + }, + removeOnFail: { + // retain completed jobs for 1 week or until it is the 1_000th failed job being retained, whichever comes first + age: 1 * TIME.week, + count: 1_000 + } +} + +export const addRequestQueueListeners = (params: { + requestQueue: QueueEventEmitter + requestErrorHandler: (err: Error) => void + requestFailedHandler: (job: Job, err: Error) => void + requestActiveHandler: (job: Job) => void +}) => { + const { + requestQueue, + requestErrorHandler, + requestFailedHandler, + requestActiveHandler + } = params + + // The error event is triggered when an error in the Redis backend is thrown. + requestQueue.removeListener('error', requestErrorHandler) + requestQueue.on('error', requestErrorHandler) + + // The failed event is triggered when a job fails by throwing an exception during execution. + // https://api.docs.bullmq.io/interfaces/v5.QueueEventsListener.html#failed + requestQueue.removeListener('failed', requestFailedHandler) + requestQueue.on('failed', requestFailedHandler) + + requestQueue.removeListener('active', requestActiveHandler) + requestQueue.on('active', requestActiveHandler) +} + +export const createRequestAndResponseQueues = async (params: { + redisUrl: string + requestQueueName: string + responseQueueName: string + requestErrorHandler: (err: Error) => void + requestFailedHandler: (job: Job, err: Error) => void + requestActiveHandler: (job: Job) => void +}): Promise<{ + requestQueue: Queue + responseQueue: Queue +}> => { + const { + redisUrl, + requestQueueName, + responseQueueName, + requestErrorHandler, + requestActiveHandler, + requestFailedHandler + } = params + + const previewRequestQueue = await initializeQueue({ + queueName: requestQueueName, + redisUrl, + options: { + defaultJobOptions + } + }) + + addRequestQueueListeners({ + requestQueue: previewRequestQueue, + requestErrorHandler, + requestFailedHandler, + requestActiveHandler + }) + + const previewResponseQueue = await initializeQueue({ + queueName: responseQueueName, + redisUrl + }) + + return { requestQueue: previewRequestQueue, responseQueue: previewResponseQueue } +} diff --git a/packages/server/modules/previews/domain/consts.ts b/packages/server/modules/previews/domain/consts.ts index 8bcaef05c..94f4f5e5d 100644 --- a/packages/server/modules/previews/domain/consts.ts +++ b/packages/server/modules/previews/domain/consts.ts @@ -10,3 +10,6 @@ export const PreviewPriority = { MEDIUM: 100, HIGH: 200 } as const + +export const DelayBetweenPreviewRetriesMinutes = 2 +export const NumberOfPreviewRetries = 2 diff --git a/packages/server/modules/previews/domain/operations.ts b/packages/server/modules/previews/domain/operations.ts index 196548e6b..565b0c10a 100644 --- a/packages/server/modules/previews/domain/operations.ts +++ b/packages/server/modules/previews/domain/operations.ts @@ -1,6 +1,8 @@ import type { ObjectPreview } from '@/modules/previews/domain/types' import type { Nullable, Optional, PartialBy } from '@speckle/shared' import type { Request, Response } from 'express' +import type { Logger } from '@/observability/logging' +import { PreviewResultPayload } from '@speckle/shared/workers/previews' export type GetObjectPreviewInfo = (params: { streamId: string @@ -16,10 +18,15 @@ export type ObjectPreviewInput = Pick< 'streamId' | 'objectId' | 'priority' > export type StoreObjectPreview = (params: ObjectPreviewInput) => Promise + export type UpsertObjectPreview = (params: { objectPreview: PartialBy }) => Promise +export type UpdateObjectPreview = (params: { + objectPreview: PartialBy +}) => Promise + export type ObjectPreviewRequest = { url: string token: string @@ -64,3 +71,24 @@ export type SendObjectPreview = ( export type CheckStreamPermissions = ( req: Request ) => Promise<{ hasPermissions: boolean; httpErrorCode: number }> + +export type ConsumePreviewResult = ({ + projectId, + objectId, + previewResult +}: { + projectId: string + objectId: string + previewResult: PreviewResultPayload +}) => Promise + +export type BuildConsumePreviewResult = (deps: { + logger: Logger + projectId: string +}) => Promise + +export type BuildUpdateObjectPreview = (params: { + projectId: string +}) => Promise + +export type ObserveMetrics = (params: { payload: PreviewResultPayload }) => void diff --git a/packages/server/modules/previews/index.ts b/packages/server/modules/previews/index.ts index 78721e181..e3f4e4523 100644 --- a/packages/server/modules/previews/index.ts +++ b/packages/server/modules/previews/index.ts @@ -1,6 +1,5 @@ /* istanbul ignore file */ import { moduleLogger, previewLogger as logger } from '@/observability/logging' -import { consumePreviewResultFactory } from '@/modules/previews/resultListener' import { disablePreviews, @@ -8,164 +7,95 @@ import { getRedisUrl, getServerOrigin } from '@/modules/shared/helpers/envHelper' -import { ensureError, TIME } from '@speckle/shared' +import type { Queue } from 'bull' +import { ensureError } from '@speckle/shared' import { previewRouterFactory } from '@/modules/previews/rest/router' import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper' -import { - JobPayload, - PreviewResultPayload, - previewResultPayload -} from '@speckle/shared/workers/previews' -import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' -import { - storePreviewFactory, - upsertObjectPreviewFactory -} from '@/modules/previews/repository/previews' -import { getObjectCommitsWithStreamIdsFactory } from '@/modules/core/repositories/commits' import { initializeMetrics, - PreviewJobDurationStep + observeMetricsFactory } from '@/modules/previews/observability/metrics' -import { addRequestQueueListeners } from '@/modules/previews/queues/previews' -import { initializeQueue } from '@speckle/shared/queue' -import type Bull from 'bull' +import { responseHandlerFactory } from '@/modules/previews/services/responses' +import { createRequestAndResponseQueues } from '@/modules/previews/clients/bull' +import { buildConsumePreviewResult } from '@/modules/previews/resultListener' +import { + requestActiveHandlerFactory, + requestErrorHandlerFactory, + requestFailedHandlerFactory +} from '@/modules/previews/queues/previews' +import type { BuildUpdateObjectPreview } from '@/modules/previews/domain/operations' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import { updateObjectPreviewFactory } from '@/modules/previews/repository/previews' const JobQueueName = 'preview-service-jobs' const ResponseQueueNamePrefix = 'preview-service-results' -const getPreviewQueues = async (params: { responseQueueName: string }) => { - const { responseQueueName } = params - const redisUrl = getPreviewServiceRedisUrl() ?? getRedisUrl() - - // previews are requested on this queue - const previewRequestQueue = await initializeQueue({ - queueName: JobQueueName, - redisUrl - }) - addRequestQueueListeners({ - logger, - previewRequestQueue - }) - - // rendered previews are sent back on this queue - const previewResponseQueue = await initializeQueue({ - queueName: responseQueueName, - redisUrl - }) - - return { previewRequestQueue, previewResponseQueue } -} +const buildUpdateObjectPreviewFunction = + (): BuildUpdateObjectPreview => async (params) => { + const { projectId } = params + const projectDb = await getProjectDbClient({ projectId }) + return updateObjectPreviewFactory({ db: projectDb }) + } export const init: SpeckleModule['init'] = async ({ app, isInitial, metricsRegister }) => { - if (isInitial) { - if (disablePreviews()) { - moduleLogger.warn('📸 Object preview module is DISABLED') - } else { - moduleLogger.info('📸 Init object preview module') - } + if (!isInitial) return - const responseQueueName = `${ResponseQueueNamePrefix}-${ - new URL(getServerOrigin()).hostname - }` - - let previewRequestQueue: Bull.Queue - let previewResponseQueue: Bull.Queue - - try { - ;({ previewRequestQueue, previewResponseQueue } = await getPreviewQueues({ - responseQueueName - })) - } catch (e) { - const err = ensureError(e, 'Unknown error when creating preview queues') - moduleLogger.error( - { err }, - 'Could not create preview queues. Disabling previews.' - ) - return - } - - const { previewJobsProcessedSummary } = initializeMetrics({ - registers: [metricsRegister], - previewRequestQueue, - previewResponseQueue - }) - - const previewRouter = previewRouterFactory({ - previewRequestQueue, - responseQueueName - }) - app.use(previewRouter) - - void previewResponseQueue.process(async (payload, done) => { - const { attemptsMade } = payload - const parsedMessage = previewResultPayload - .refine((data) => data.jobId.split('.').length === 2, { - message: 'jobId must be in the format "projectId.objectId"' - }) - .transform((data) => ({ - ...data, - projectId: data.jobId.split('.')[0], - objectId: data.jobId.split('.')[1] - })) - .safeParse(payload.data) - if (!parsedMessage.success) { - logger.error( - { payload: payload.data, reason: parsedMessage.error }, - 'Failed to parse previewResult payload' - ) - - // as we can't parse the response we neither have a job ID nor a duration, - // we cannot get a duration to populate previewJobsProcessedSummary.observe - - done(parsedMessage.error) - return - } - const parsedResult = parsedMessage.data - const { projectId, objectId } = parsedResult - const jobLogger = logger.child({ - projectId, - objectId, - responsePriorAttemptsMade: attemptsMade - }) - - const projectDb = await getProjectDbClient({ projectId }) - await consumePreviewResultFactory({ - logger: jobLogger, - storePreview: storePreviewFactory({ db: projectDb }), - upsertObjectPreview: upsertObjectPreviewFactory({ db: projectDb }), - getObjectCommitsWithStreamIds: getObjectCommitsWithStreamIdsFactory({ - db: projectDb - }) - })({ - projectId, - objectId, - previewResult: parsedResult - }) - - previewJobsProcessedSummary.observe( - { status: parsedResult.status, step: PreviewJobDurationStep.TOTAL }, - parsedResult.result.durationSeconds * TIME.second - ) - if (parsedResult.result.loadDurationSeconds) { - previewJobsProcessedSummary.observe( - { status: parsedResult.status, step: PreviewJobDurationStep.LOAD }, - parsedResult.result.loadDurationSeconds * TIME.second - ) - } - if (parsedResult.result.renderDurationSeconds) { - previewJobsProcessedSummary.observe( - { status: parsedResult.status, step: PreviewJobDurationStep.RENDER }, - parsedResult.result.renderDurationSeconds * TIME.second - ) - } - - done() - }) + if (disablePreviews()) { + moduleLogger.warn('📸 Object preview module is DISABLED') + return + } else { + moduleLogger.info('📸 Init object preview module') } + + const responseQueueName = `${ResponseQueueNamePrefix}-${ + new URL(getServerOrigin()).hostname + }` + + let previewRequestQueue: Queue + let previewResponseQueue: Queue + + try { + ;({ requestQueue: previewRequestQueue, responseQueue: previewResponseQueue } = + await createRequestAndResponseQueues({ + redisUrl: getPreviewServiceRedisUrl() ?? getRedisUrl(), + requestQueueName: JobQueueName, + responseQueueName, + requestErrorHandler: requestErrorHandlerFactory({ logger }), + requestFailedHandler: requestFailedHandlerFactory({ + logger, + buildUpdateObjectPreview: buildUpdateObjectPreviewFunction() + }), + requestActiveHandler: requestActiveHandlerFactory({ logger }) + })) + } catch (e) { + const err = ensureError(e, 'Unknown error when creating preview queues') + moduleLogger.error({ err }, 'Could not create preview queues. Disabling previews.') + return + } + + const { previewJobsProcessedSummary } = initializeMetrics({ + registers: [metricsRegister], + previewRequestQueue, + previewResponseQueue + }) + + const previewRouter = previewRouterFactory({ + previewRequestQueue, + responseQueueName + }) + app.use(previewRouter) + + void previewResponseQueue.process( + responseHandlerFactory({ + observeMetrics: observeMetricsFactory({ summary: previewJobsProcessedSummary }), + logger, + consumePreviewResultBuilder: buildConsumePreviewResult + }) + ) } export const finalize = () => {} diff --git a/packages/server/modules/previews/observability/metrics.ts b/packages/server/modules/previews/observability/metrics.ts index a408aa583..967ba526c 100644 --- a/packages/server/modules/previews/observability/metrics.ts +++ b/packages/server/modules/previews/observability/metrics.ts @@ -1,6 +1,7 @@ import { TIME } from '@speckle/shared' import Bull from 'bull' import { type Registry, Counter, Summary, Gauge } from 'prom-client' +import { ObserveMetrics } from '@/modules/previews/domain/operations' export const PreviewJobDurationStep = { TOTAL: 'total', @@ -139,3 +140,28 @@ export const initializeMetrics = (params: { return { previewJobsProcessedSummary } } + +export const observeMetricsFactory = (deps: { + summary: Summary<'step' | 'status'> +}): ObserveMetrics => { + const { summary } = deps + return (params) => { + const { payload } = params + summary.observe( + { status: payload.status, step: PreviewJobDurationStep.TOTAL }, + payload.result.durationSeconds * TIME.second + ) + if (payload.result.loadDurationSeconds) { + summary.observe( + { status: payload.status, step: PreviewJobDurationStep.LOAD }, + payload.result.loadDurationSeconds * TIME.second + ) + } + if (payload.result.renderDurationSeconds) { + summary.observe( + { status: payload.status, step: PreviewJobDurationStep.RENDER }, + payload.result.renderDurationSeconds * TIME.second + ) + } + } +} diff --git a/packages/server/modules/previews/queues/previews.ts b/packages/server/modules/previews/queues/previews.ts index c0d9925d2..4e05dfb45 100644 --- a/packages/server/modules/previews/queues/previews.ts +++ b/packages/server/modules/previews/queues/previews.ts @@ -1,11 +1,12 @@ -import type { RequestObjectPreview } from '@/modules/previews/domain/operations' +import type { + BuildUpdateObjectPreview, + RequestObjectPreview +} from '@/modules/previews/domain/operations' import type { Logger } from '@/observability/logging' import type { Queue, Job } from 'bull' -import type { EventEmitter } from 'stream' -import { upsertObjectPreviewFactory } from '@/modules/previews/repository/previews' -import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { PreviewStatus } from '@/modules/previews/domain/consts' -import { JobPayload } from '@speckle/shared/workers/previews' +import type { JobPayload } from '@speckle/shared/workers/previews' +import { fromJobId, jobIdSchema } from '@speckle/shared/workers/previews' export const requestObjectPreviewFactory = ({ @@ -20,27 +21,29 @@ export const requestObjectPreviewFactory = await queue.add(payload, { removeOnComplete: true, attempts: 3 }) } -interface QueueEventEmitter extends EventEmitter {} - -export const addRequestQueueListeners = (params: { - logger: Logger - previewRequestQueue: QueueEventEmitter -}) => { - const { logger, previewRequestQueue } = params - - const requestErrorHandler = (err: Error) => { - logger.error({ err }, 'Preview generation failed') +export const requestErrorHandlerFactory = + (deps: { logger: Logger }) => (err: Error) => { + deps.logger.error( + { err }, + 'Preview generation resulted in an error due to the Redis backend.' + ) } - previewRequestQueue.removeListener('error', requestErrorHandler) - previewRequestQueue.on('error', requestErrorHandler) - const requestFailedHandler = async (job: Job, err: Error) => { +export const requestActiveHandlerFactory = (deps: { logger: Logger }) => (job: Job) => { + const jobId = 'jobId' in job.data ? job.data.jobId : undefined + deps.logger.info({ jobId }, 'Preview job {jobId} processing started.') +} + +export const requestFailedHandlerFactory = + (deps: { logger: Logger; buildUpdateObjectPreview: BuildUpdateObjectPreview }) => + async (job: Job, err: Error) => { + const { logger, buildUpdateObjectPreview } = deps const jobId = 'jobId' in job.data ? job.data.jobId : undefined logger.error({ err, jobId }, 'Preview job {jobId} failed.') if (!jobId) return - const [projectId, objectId] = jobId.split('.') - const projectDb = await getProjectDbClient({ projectId }) - await upsertObjectPreviewFactory({ db: projectDb })({ + const { projectId, objectId } = fromJobId(jobId) + const updateObjectPreview = await buildUpdateObjectPreview({ projectId }) + const updatedRecords = await updateObjectPreview({ objectPreview: { streamId: projectId, objectId, @@ -48,14 +51,16 @@ export const addRequestQueueListeners = (params: { lastUpdate: new Date() } }) + if (updatedRecords.length < 1) { + logger.warn( + { projectId, objectId }, + 'No object preview was updated for {projectId}.{objectId} after a failed preview job. This may indicate that the object preview does not exist or that the project does not exist, or has moved regions.' + ) + } + if (updatedRecords.length > 1) { + logger.warn( + { projectId, objectId, updatedRecords }, + 'Multiple object previews were updated for {projectId}.{objectId} after a failed preview job. This may indicate a data integrity issue.' + ) + } } - previewRequestQueue.removeListener('failed', requestFailedHandler) - previewRequestQueue.on('failed', requestFailedHandler) - - const requestActiveHandler = (job: Job) => { - const jobId = 'jobId' in job.data ? job.data.jobId : undefined - logger.info({ jobId }, 'Preview job {jobId} processing started.') - } - previewRequestQueue.removeListener('active', requestActiveHandler) - previewRequestQueue.on('active', requestActiveHandler) -} diff --git a/packages/server/modules/previews/repository/previews.ts b/packages/server/modules/previews/repository/previews.ts index b7c5e8746..36549a0a8 100644 --- a/packages/server/modules/previews/repository/previews.ts +++ b/packages/server/modules/previews/repository/previews.ts @@ -4,7 +4,7 @@ import { GetPreviewImage, StoreObjectPreview, StorePreview, - UpsertObjectPreview + UpdateObjectPreview } from '@/modules/previews/domain/operations' import { ObjectPreview as ObjectPreviewRecord, @@ -67,14 +67,17 @@ export const storePreviewFactory = await tables.previews(db).insert(preview).onConflict().ignore() } -export const upsertObjectPreviewFactory = - ({ db }: { db: Knex }): UpsertObjectPreview => +export const updateObjectPreviewFactory = + ({ db }: { db: Knex }): UpdateObjectPreview => async ({ objectPreview }) => { - await tables + return await tables .objectPreview(db) - .insert(objectPreview) - .onConflict(['streamId', 'objectId']) - .merge() + .where({ + streamId: objectPreview.streamId, + objectId: objectPreview.objectId + }) + .update(objectPreview) + .returning('*') } export const getPreviewImageFactory = diff --git a/packages/server/modules/previews/resultListener.ts b/packages/server/modules/previews/resultListener.ts index bce2e706c..1bc72c943 100644 --- a/packages/server/modules/previews/resultListener.ts +++ b/packages/server/modules/previews/resultListener.ts @@ -1,44 +1,58 @@ import { ProjectSubscriptions } from '@/modules/shared/utils/subscriptions' import { publish } from '@/modules/shared/utils/subscriptions' -import { PreviewResultPayload } from '@speckle/shared/workers/previews' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { throwUncoveredError } from '@speckle/shared' import type { Logger } from '@/observability/logging' import crypto from 'crypto' -import { StorePreview, UpsertObjectPreview } from '@/modules/previews/domain/operations' +import { + BuildConsumePreviewResult, + ConsumePreviewResult, + StorePreview, + UpdateObjectPreview +} from '@/modules/previews/domain/operations' import { joinImages } from 'join-images' import { GetObjectCommitsWithStreamIds } from '@/modules/core/domain/commits/operations' import { PreviewPriority, PreviewStatus } from '@/modules/previews/domain/consts' +import { + storePreviewFactory, + updateObjectPreviewFactory +} from '@/modules/previews/repository/previews' +import { getObjectCommitsWithStreamIdsFactory } from '@/modules/core/repositories/commits' + +export const buildConsumePreviewResult: BuildConsumePreviewResult = async (deps) => { + const { logger, projectId } = deps + const projectDb = await getProjectDbClient({ projectId }) + return consumePreviewResultFactory({ + logger, + storePreview: storePreviewFactory({ db: projectDb }), + updateObjectPreview: updateObjectPreviewFactory({ db: projectDb }), + getObjectCommitsWithStreamIds: getObjectCommitsWithStreamIdsFactory({ + db: projectDb + }) + }) +} export const consumePreviewResultFactory = ({ logger, - upsertObjectPreview, + updateObjectPreview, storePreview, getObjectCommitsWithStreamIds }: { logger: Logger - upsertObjectPreview: UpsertObjectPreview + updateObjectPreview: UpdateObjectPreview storePreview: StorePreview getObjectCommitsWithStreamIds: GetObjectCommitsWithStreamIds - }) => - async ({ - projectId, - objectId, - previewResult - }: { - projectId: string - objectId: string - previewResult: PreviewResultPayload - }) => { - const streamId = projectId + }): ConsumePreviewResult => + async ({ projectId, objectId, previewResult }) => { const lastUpdate = new Date() const priority = PreviewPriority.LOW const log = logger.child({ jobId: previewResult.jobId, status: previewResult.status, durationSeconds: previewResult.result.durationSeconds, - projectId: streamId, - streamId, // for legacy reasons + projectId, + streamId: projectId, // for legacy reasons objectId }) @@ -48,10 +62,10 @@ export const consumePreviewResultFactory = switch (previewResult.status) { case 'error': log.warn({ reason: previewResult.reason }, previewMessage) - await upsertObjectPreview({ + await updateObjectPreview({ objectPreview: { objectId, - streamId, + streamId: projectId, lastUpdate, preview: { err: previewResult.reason }, priority, @@ -97,10 +111,10 @@ export const consumePreviewResultFactory = preview['all'] = fullImgId - await upsertObjectPreview({ + await updateObjectPreview({ objectPreview: { objectId, - streamId, + streamId: projectId, lastUpdate, preview, priority, @@ -108,7 +122,7 @@ export const consumePreviewResultFactory = } }) const commits = await getObjectCommitsWithStreamIds([objectId], { - streamIds: [streamId] + streamIds: [projectId] }) if (!commits.length) break diff --git a/packages/server/modules/previews/services/createObjectPreview.ts b/packages/server/modules/previews/services/createObjectPreview.ts index 7fd72da8d..59c5218cb 100644 --- a/packages/server/modules/previews/services/createObjectPreview.ts +++ b/packages/server/modules/previews/services/createObjectPreview.ts @@ -8,6 +8,7 @@ import { } from '@/modules/previews/domain/operations' import { Roles, Scopes, TIME_MS } from '@speckle/shared' import { TokenResourceIdentifierType } from '@/modules/core/domain/tokens/types' +import { toJobId } from '@speckle/shared/workers/previews' export const createObjectPreviewFactory = ({ @@ -39,10 +40,12 @@ export const createObjectPreviewFactory = return false } + const jobId = toJobId({ projectId: streamId, objectId }) + // we're running the preview generation in the name of a project owner const token = await createAppToken({ appId: DefaultAppIds.Web, - name: `preview-${streamId}@${objectId}`, + name: `preview-${jobId}`, userId, scopes: [Scopes.Streams.Read], lifespan: 2 * TIME_MS.hour, @@ -58,6 +61,10 @@ export const createObjectPreviewFactory = serverOrigin ).toString() - await requestObjectPreview({ jobId: `${streamId}.${objectId}`, token, url }) + await requestObjectPreview({ + jobId, + token, + url + }) return true } diff --git a/packages/server/modules/previews/services/responses.ts b/packages/server/modules/previews/services/responses.ts new file mode 100644 index 000000000..5fc74e52a --- /dev/null +++ b/packages/server/modules/previews/services/responses.ts @@ -0,0 +1,85 @@ +import { ensureError } from '@speckle/shared' +import type { Logger } from '@/observability/logging' +import type { DoneCallback, Job } from 'bull' +import type { + BuildConsumePreviewResult, + ObserveMetrics +} from '@/modules/previews/domain/operations' +import { StreamNotFoundError } from '@/modules/core/errors/stream' +import { fromJobId, previewResultPayload } from '@speckle/shared/workers/previews' + +const parseMessage = (data: string) => + previewResultPayload + .transform((data) => { + const { projectId, objectId } = fromJobId(data.jobId) + return { + ...data, + projectId, + objectId + } + }) + .safeParse(data) + +export const responseHandlerFactory = (deps: { + consumePreviewResultBuilder: BuildConsumePreviewResult + observeMetrics: ObserveMetrics + logger: Logger +}) => { + const { observeMetrics, logger, consumePreviewResultBuilder } = deps + return async (payload: Pick, done: DoneCallback) => { + const { attemptsMade } = payload + const parsedMessage = parseMessage(payload.data) + if (!parsedMessage.success) { + logger.error( + { payload: payload.data, reason: parsedMessage.error }, + 'Failed to parse previewResult payload' + ) + + // as we can't parse the response we neither have a job ID nor a duration, + // we cannot get a duration to populate previewJobsProcessedSummary.observe + + done(parsedMessage.error) + return + } + + const parsedPayload = parsedMessage.data + const { projectId, objectId } = parsedPayload + const jobLogger = logger.child({ + projectId, + objectId, + responsePriorAttemptsMade: attemptsMade + }) + + try { + observeMetrics({ payload: parsedPayload }) + + const consumePreviewResult = await consumePreviewResultBuilder({ + logger: jobLogger, + projectId + }) + + await consumePreviewResult({ + projectId, + objectId, + previewResult: parsedPayload + }) + } catch (e) { + const err = ensureError(e, 'Unknown error when consuming preview result') + + if (err instanceof StreamNotFoundError) { + jobLogger.warn( + { err }, + 'Failed to consume preview result; the stream does not exist. Probably deleted while preview was generated.' + ) + done() // don't pass the error to done, as we don't want to retry the job + return + } + + jobLogger.error({ err }, 'Failed to consume preview result') + done(err) + return + } + + done() + } +} diff --git a/packages/server/modules/previews/tests/unit/responses.spec.ts b/packages/server/modules/previews/tests/unit/responses.spec.ts new file mode 100644 index 000000000..c6ed7d7fd --- /dev/null +++ b/packages/server/modules/previews/tests/unit/responses.spec.ts @@ -0,0 +1,47 @@ +import { expect } from 'chai' +import { responseHandlerFactory } from '@/modules/previews/services/responses' +import { testLogger as logger } from '@/observability/logging' +import { buildConsumePreviewResult } from '@/modules/previews/resultListener' +import cryptoRandomString from 'crypto-random-string' + +describe('object preview @previews', () => { + describe('responseHandlerFactory creates a function, that', () => { + beforeEach(() => {}) + it('gracefully handles a response for a non-existent (e.g. deleted in meantime) project', async () => { + const handleResponse = responseHandlerFactory({ + observeMetrics: () => {}, + logger, + consumePreviewResultBuilder: buildConsumePreviewResult + }) + let doneCalled = false + let doneErr: Error | null | undefined = null + await expect( + handleResponse( + { + data: { + jobId: `${cryptoRandomString({ length: 10 })}.${cryptoRandomString({ + length: 10 + })}`, + status: 'success', + result: { + durationSeconds: 0, + loadDurationSeconds: 0, + renderDurationSeconds: 0, + screenshots: { + '0': 'data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII' + } + } + }, + attemptsMade: 0 + }, + (err) => { + doneCalled = true + doneErr = err + } + ) + ).eventually.be.fulfilled + expect(doneCalled).to.be.true + expect(doneErr).to.be.undefined + }) + }) +}) diff --git a/packages/server/modules/shared/helpers/envHelper.ts b/packages/server/modules/shared/helpers/envHelper.ts index 521181a9c..4da4b9798 100644 --- a/packages/server/modules/shared/helpers/envHelper.ts +++ b/packages/server/modules/shared/helpers/envHelper.ts @@ -497,3 +497,7 @@ export const isRateLimiterEnabled = (): boolean => { export const getFileUploadUrlExpiryMinutes = (): number => { return getIntFromEnv('FILE_UPLOAD_URL_EXPIRY_MINUTES', '1440') } + +export const getPreviewServiceTimeoutMilliseconds = (): number => { + return getIntFromEnv('PREVIEW_SERVICE_TIMEOUT_MILLISECONDS', '3600000') // 1 hour +} diff --git a/packages/shared/src/workers/previews/job.ts b/packages/shared/src/workers/previews/job.ts index 2e95596ed..100ac692c 100644 --- a/packages/shared/src/workers/previews/job.ts +++ b/packages/shared/src/workers/previews/job.ts @@ -1,7 +1,29 @@ import z from 'zod' +const JobIdSeparator = '.' + +export const jobIdSchema = z + .string() + .refine((data) => data.split(JobIdSeparator).length === 2, { + message: 'jobId must be in the format "projectId.objectId"' + }) + +export const toJobId = (params: { projectId: string; objectId: string }): string => { + return `${params.projectId}${JobIdSeparator}${params.objectId}` +} +export const fromJobId = (jobId: string): { projectId: string; objectId: string } => { + return jobIdSchema + .transform((data) => { + return { + projectId: data.split(JobIdSeparator)[0], + objectId: data.split(JobIdSeparator)[1] + } + }) + .parse(jobId) +} + const job = z.object({ - jobId: z.string() + jobId: jobIdSchema }) export const jobPayload = job.merge( diff --git a/utils/helm/speckle-server/templates/_helpers.tpl b/utils/helm/speckle-server/templates/_helpers.tpl index b0688f0a4..f5ecffce2 100644 --- a/utils/helm/speckle-server/templates/_helpers.tpl +++ b/utils/helm/speckle-server/templates/_helpers.tpl @@ -761,6 +761,10 @@ Generate the environment variables for Speckle server and Speckle objects deploy {{- if .Values.preview_service.enabled }} - name: PREVIEW_SERVICE_USE_PRIVATE_OBJECTS_SERVER_URL value: "true" +{{- if .Values.preview_service.puppeteer.timeoutMilliseconds }} +- name: PREVIEW_SERVICE_TIMEOUT_MILLISECONDS + value: {{ .Values.preview_service.puppeteer.timeoutMilliseconds | quote }} +{{- end }} {{- end }} # *** Redis ***