chore(server/previews): handle case where project deleted while preview generated (#4612)
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
import { Queue, type Job } from 'bull'
|
||||
import type { EventEmitter } from 'stream'
|
||||
import { initializeQueue } from '@speckle/shared/queue'
|
||||
import { JobPayload, PreviewResultPayload } from '@speckle/shared/workers/previews'
|
||||
import {
|
||||
DelayBetweenPreviewRetriesMinutes,
|
||||
NumberOfPreviewRetries
|
||||
} from '@/modules/previews/domain/consts'
|
||||
import { TIME, TIME_MS } from '@speckle/shared'
|
||||
import { getPreviewServiceTimeoutMilliseconds } from '@/modules/shared/helpers/envHelper'
|
||||
|
||||
interface QueueEventEmitter extends EventEmitter {}
|
||||
|
||||
const defaultJobOptions = {
|
||||
attempts: NumberOfPreviewRetries,
|
||||
timeout:
|
||||
NumberOfPreviewRetries *
|
||||
(getPreviewServiceTimeoutMilliseconds() +
|
||||
DelayBetweenPreviewRetriesMinutes * TIME_MS.minute),
|
||||
backoff: {
|
||||
type: 'fixed',
|
||||
delay: DelayBetweenPreviewRetriesMinutes * TIME_MS.minute
|
||||
},
|
||||
removeOnComplete: {
|
||||
// retain completed jobs for 1 day or until it is the 100th completed job being retained, whichever comes first
|
||||
age: 1 * TIME.day,
|
||||
count: 100
|
||||
},
|
||||
removeOnFail: {
|
||||
// retain completed jobs for 1 week or until it is the 1_000th failed job being retained, whichever comes first
|
||||
age: 1 * TIME.week,
|
||||
count: 1_000
|
||||
}
|
||||
}
|
||||
|
||||
export const addRequestQueueListeners = (params: {
|
||||
requestQueue: QueueEventEmitter
|
||||
requestErrorHandler: (err: Error) => void
|
||||
requestFailedHandler: (job: Job, err: Error) => void
|
||||
requestActiveHandler: (job: Job) => void
|
||||
}) => {
|
||||
const {
|
||||
requestQueue,
|
||||
requestErrorHandler,
|
||||
requestFailedHandler,
|
||||
requestActiveHandler
|
||||
} = params
|
||||
|
||||
// The error event is triggered when an error in the Redis backend is thrown.
|
||||
requestQueue.removeListener('error', requestErrorHandler)
|
||||
requestQueue.on('error', requestErrorHandler)
|
||||
|
||||
// The failed event is triggered when a job fails by throwing an exception during execution.
|
||||
// https://api.docs.bullmq.io/interfaces/v5.QueueEventsListener.html#failed
|
||||
requestQueue.removeListener('failed', requestFailedHandler)
|
||||
requestQueue.on('failed', requestFailedHandler)
|
||||
|
||||
requestQueue.removeListener('active', requestActiveHandler)
|
||||
requestQueue.on('active', requestActiveHandler)
|
||||
}
|
||||
|
||||
export const createRequestAndResponseQueues = async (params: {
|
||||
redisUrl: string
|
||||
requestQueueName: string
|
||||
responseQueueName: string
|
||||
requestErrorHandler: (err: Error) => void
|
||||
requestFailedHandler: (job: Job, err: Error) => void
|
||||
requestActiveHandler: (job: Job) => void
|
||||
}): Promise<{
|
||||
requestQueue: Queue<JobPayload>
|
||||
responseQueue: Queue<PreviewResultPayload>
|
||||
}> => {
|
||||
const {
|
||||
redisUrl,
|
||||
requestQueueName,
|
||||
responseQueueName,
|
||||
requestErrorHandler,
|
||||
requestActiveHandler,
|
||||
requestFailedHandler
|
||||
} = params
|
||||
|
||||
const previewRequestQueue = await initializeQueue<JobPayload>({
|
||||
queueName: requestQueueName,
|
||||
redisUrl,
|
||||
options: {
|
||||
defaultJobOptions
|
||||
}
|
||||
})
|
||||
|
||||
addRequestQueueListeners({
|
||||
requestQueue: previewRequestQueue,
|
||||
requestErrorHandler,
|
||||
requestFailedHandler,
|
||||
requestActiveHandler
|
||||
})
|
||||
|
||||
const previewResponseQueue = await initializeQueue<PreviewResultPayload>({
|
||||
queueName: responseQueueName,
|
||||
redisUrl
|
||||
})
|
||||
|
||||
return { requestQueue: previewRequestQueue, responseQueue: previewResponseQueue }
|
||||
}
|
||||
@@ -10,3 +10,6 @@ export const PreviewPriority = {
|
||||
MEDIUM: 100,
|
||||
HIGH: 200
|
||||
} as const
|
||||
|
||||
export const DelayBetweenPreviewRetriesMinutes = 2
|
||||
export const NumberOfPreviewRetries = 2
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import type { ObjectPreview } from '@/modules/previews/domain/types'
|
||||
import type { Nullable, Optional, PartialBy } from '@speckle/shared'
|
||||
import type { Request, Response } from 'express'
|
||||
import type { Logger } from '@/observability/logging'
|
||||
import { PreviewResultPayload } from '@speckle/shared/workers/previews'
|
||||
|
||||
export type GetObjectPreviewInfo = (params: {
|
||||
streamId: string
|
||||
@@ -16,10 +18,15 @@ export type ObjectPreviewInput = Pick<
|
||||
'streamId' | 'objectId' | 'priority'
|
||||
>
|
||||
export type StoreObjectPreview = (params: ObjectPreviewInput) => Promise<void>
|
||||
|
||||
export type UpsertObjectPreview = (params: {
|
||||
objectPreview: PartialBy<ObjectPreview, 'preview' | 'priority'>
|
||||
}) => Promise<void>
|
||||
|
||||
export type UpdateObjectPreview = (params: {
|
||||
objectPreview: PartialBy<ObjectPreview, 'preview' | 'priority'>
|
||||
}) => Promise<ObjectPreview[]>
|
||||
|
||||
export type ObjectPreviewRequest = {
|
||||
url: string
|
||||
token: string
|
||||
@@ -64,3 +71,24 @@ export type SendObjectPreview = (
|
||||
export type CheckStreamPermissions = (
|
||||
req: Request
|
||||
) => Promise<{ hasPermissions: boolean; httpErrorCode: number }>
|
||||
|
||||
export type ConsumePreviewResult = ({
|
||||
projectId,
|
||||
objectId,
|
||||
previewResult
|
||||
}: {
|
||||
projectId: string
|
||||
objectId: string
|
||||
previewResult: PreviewResultPayload
|
||||
}) => Promise<void>
|
||||
|
||||
export type BuildConsumePreviewResult = (deps: {
|
||||
logger: Logger
|
||||
projectId: string
|
||||
}) => Promise<ConsumePreviewResult>
|
||||
|
||||
export type BuildUpdateObjectPreview = (params: {
|
||||
projectId: string
|
||||
}) => Promise<UpdateObjectPreview>
|
||||
|
||||
export type ObserveMetrics = (params: { payload: PreviewResultPayload }) => void
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/* istanbul ignore file */
|
||||
import { moduleLogger, previewLogger as logger } from '@/observability/logging'
|
||||
import { consumePreviewResultFactory } from '@/modules/previews/resultListener'
|
||||
|
||||
import {
|
||||
disablePreviews,
|
||||
@@ -8,164 +7,95 @@ import {
|
||||
getRedisUrl,
|
||||
getServerOrigin
|
||||
} from '@/modules/shared/helpers/envHelper'
|
||||
import { ensureError, TIME } from '@speckle/shared'
|
||||
import type { Queue } from 'bull'
|
||||
import { ensureError } from '@speckle/shared'
|
||||
import { previewRouterFactory } from '@/modules/previews/rest/router'
|
||||
import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
|
||||
import {
|
||||
JobPayload,
|
||||
PreviewResultPayload,
|
||||
previewResultPayload
|
||||
} from '@speckle/shared/workers/previews'
|
||||
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
|
||||
import {
|
||||
storePreviewFactory,
|
||||
upsertObjectPreviewFactory
|
||||
} from '@/modules/previews/repository/previews'
|
||||
import { getObjectCommitsWithStreamIdsFactory } from '@/modules/core/repositories/commits'
|
||||
import {
|
||||
initializeMetrics,
|
||||
PreviewJobDurationStep
|
||||
observeMetricsFactory
|
||||
} from '@/modules/previews/observability/metrics'
|
||||
import { addRequestQueueListeners } from '@/modules/previews/queues/previews'
|
||||
import { initializeQueue } from '@speckle/shared/queue'
|
||||
import type Bull from 'bull'
|
||||
import { responseHandlerFactory } from '@/modules/previews/services/responses'
|
||||
import { createRequestAndResponseQueues } from '@/modules/previews/clients/bull'
|
||||
import { buildConsumePreviewResult } from '@/modules/previews/resultListener'
|
||||
import {
|
||||
requestActiveHandlerFactory,
|
||||
requestErrorHandlerFactory,
|
||||
requestFailedHandlerFactory
|
||||
} from '@/modules/previews/queues/previews'
|
||||
import type { BuildUpdateObjectPreview } from '@/modules/previews/domain/operations'
|
||||
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
|
||||
import { updateObjectPreviewFactory } from '@/modules/previews/repository/previews'
|
||||
|
||||
const JobQueueName = 'preview-service-jobs'
|
||||
const ResponseQueueNamePrefix = 'preview-service-results'
|
||||
|
||||
const getPreviewQueues = async (params: { responseQueueName: string }) => {
|
||||
const { responseQueueName } = params
|
||||
const redisUrl = getPreviewServiceRedisUrl() ?? getRedisUrl()
|
||||
|
||||
// previews are requested on this queue
|
||||
const previewRequestQueue = await initializeQueue<JobPayload>({
|
||||
queueName: JobQueueName,
|
||||
redisUrl
|
||||
})
|
||||
addRequestQueueListeners({
|
||||
logger,
|
||||
previewRequestQueue
|
||||
})
|
||||
|
||||
// rendered previews are sent back on this queue
|
||||
const previewResponseQueue = await initializeQueue<PreviewResultPayload>({
|
||||
queueName: responseQueueName,
|
||||
redisUrl
|
||||
})
|
||||
|
||||
return { previewRequestQueue, previewResponseQueue }
|
||||
}
|
||||
const buildUpdateObjectPreviewFunction =
|
||||
(): BuildUpdateObjectPreview => async (params) => {
|
||||
const { projectId } = params
|
||||
const projectDb = await getProjectDbClient({ projectId })
|
||||
return updateObjectPreviewFactory({ db: projectDb })
|
||||
}
|
||||
|
||||
export const init: SpeckleModule['init'] = async ({
|
||||
app,
|
||||
isInitial,
|
||||
metricsRegister
|
||||
}) => {
|
||||
if (isInitial) {
|
||||
if (disablePreviews()) {
|
||||
moduleLogger.warn('📸 Object preview module is DISABLED')
|
||||
} else {
|
||||
moduleLogger.info('📸 Init object preview module')
|
||||
}
|
||||
if (!isInitial) return
|
||||
|
||||
const responseQueueName = `${ResponseQueueNamePrefix}-${
|
||||
new URL(getServerOrigin()).hostname
|
||||
}`
|
||||
|
||||
let previewRequestQueue: Bull.Queue<JobPayload>
|
||||
let previewResponseQueue: Bull.Queue<PreviewResultPayload>
|
||||
|
||||
try {
|
||||
;({ previewRequestQueue, previewResponseQueue } = await getPreviewQueues({
|
||||
responseQueueName
|
||||
}))
|
||||
} catch (e) {
|
||||
const err = ensureError(e, 'Unknown error when creating preview queues')
|
||||
moduleLogger.error(
|
||||
{ err },
|
||||
'Could not create preview queues. Disabling previews.'
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const { previewJobsProcessedSummary } = initializeMetrics({
|
||||
registers: [metricsRegister],
|
||||
previewRequestQueue,
|
||||
previewResponseQueue
|
||||
})
|
||||
|
||||
const previewRouter = previewRouterFactory({
|
||||
previewRequestQueue,
|
||||
responseQueueName
|
||||
})
|
||||
app.use(previewRouter)
|
||||
|
||||
void previewResponseQueue.process(async (payload, done) => {
|
||||
const { attemptsMade } = payload
|
||||
const parsedMessage = previewResultPayload
|
||||
.refine((data) => data.jobId.split('.').length === 2, {
|
||||
message: 'jobId must be in the format "projectId.objectId"'
|
||||
})
|
||||
.transform((data) => ({
|
||||
...data,
|
||||
projectId: data.jobId.split('.')[0],
|
||||
objectId: data.jobId.split('.')[1]
|
||||
}))
|
||||
.safeParse(payload.data)
|
||||
if (!parsedMessage.success) {
|
||||
logger.error(
|
||||
{ payload: payload.data, reason: parsedMessage.error },
|
||||
'Failed to parse previewResult payload'
|
||||
)
|
||||
|
||||
// as we can't parse the response we neither have a job ID nor a duration,
|
||||
// we cannot get a duration to populate previewJobsProcessedSummary.observe
|
||||
|
||||
done(parsedMessage.error)
|
||||
return
|
||||
}
|
||||
const parsedResult = parsedMessage.data
|
||||
const { projectId, objectId } = parsedResult
|
||||
const jobLogger = logger.child({
|
||||
projectId,
|
||||
objectId,
|
||||
responsePriorAttemptsMade: attemptsMade
|
||||
})
|
||||
|
||||
const projectDb = await getProjectDbClient({ projectId })
|
||||
await consumePreviewResultFactory({
|
||||
logger: jobLogger,
|
||||
storePreview: storePreviewFactory({ db: projectDb }),
|
||||
upsertObjectPreview: upsertObjectPreviewFactory({ db: projectDb }),
|
||||
getObjectCommitsWithStreamIds: getObjectCommitsWithStreamIdsFactory({
|
||||
db: projectDb
|
||||
})
|
||||
})({
|
||||
projectId,
|
||||
objectId,
|
||||
previewResult: parsedResult
|
||||
})
|
||||
|
||||
previewJobsProcessedSummary.observe(
|
||||
{ status: parsedResult.status, step: PreviewJobDurationStep.TOTAL },
|
||||
parsedResult.result.durationSeconds * TIME.second
|
||||
)
|
||||
if (parsedResult.result.loadDurationSeconds) {
|
||||
previewJobsProcessedSummary.observe(
|
||||
{ status: parsedResult.status, step: PreviewJobDurationStep.LOAD },
|
||||
parsedResult.result.loadDurationSeconds * TIME.second
|
||||
)
|
||||
}
|
||||
if (parsedResult.result.renderDurationSeconds) {
|
||||
previewJobsProcessedSummary.observe(
|
||||
{ status: parsedResult.status, step: PreviewJobDurationStep.RENDER },
|
||||
parsedResult.result.renderDurationSeconds * TIME.second
|
||||
)
|
||||
}
|
||||
|
||||
done()
|
||||
})
|
||||
if (disablePreviews()) {
|
||||
moduleLogger.warn('📸 Object preview module is DISABLED')
|
||||
return
|
||||
} else {
|
||||
moduleLogger.info('📸 Init object preview module')
|
||||
}
|
||||
|
||||
const responseQueueName = `${ResponseQueueNamePrefix}-${
|
||||
new URL(getServerOrigin()).hostname
|
||||
}`
|
||||
|
||||
let previewRequestQueue: Queue
|
||||
let previewResponseQueue: Queue
|
||||
|
||||
try {
|
||||
;({ requestQueue: previewRequestQueue, responseQueue: previewResponseQueue } =
|
||||
await createRequestAndResponseQueues({
|
||||
redisUrl: getPreviewServiceRedisUrl() ?? getRedisUrl(),
|
||||
requestQueueName: JobQueueName,
|
||||
responseQueueName,
|
||||
requestErrorHandler: requestErrorHandlerFactory({ logger }),
|
||||
requestFailedHandler: requestFailedHandlerFactory({
|
||||
logger,
|
||||
buildUpdateObjectPreview: buildUpdateObjectPreviewFunction()
|
||||
}),
|
||||
requestActiveHandler: requestActiveHandlerFactory({ logger })
|
||||
}))
|
||||
} catch (e) {
|
||||
const err = ensureError(e, 'Unknown error when creating preview queues')
|
||||
moduleLogger.error({ err }, 'Could not create preview queues. Disabling previews.')
|
||||
return
|
||||
}
|
||||
|
||||
const { previewJobsProcessedSummary } = initializeMetrics({
|
||||
registers: [metricsRegister],
|
||||
previewRequestQueue,
|
||||
previewResponseQueue
|
||||
})
|
||||
|
||||
const previewRouter = previewRouterFactory({
|
||||
previewRequestQueue,
|
||||
responseQueueName
|
||||
})
|
||||
app.use(previewRouter)
|
||||
|
||||
void previewResponseQueue.process(
|
||||
responseHandlerFactory({
|
||||
observeMetrics: observeMetricsFactory({ summary: previewJobsProcessedSummary }),
|
||||
logger,
|
||||
consumePreviewResultBuilder: buildConsumePreviewResult
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
export const finalize = () => {}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { TIME } from '@speckle/shared'
|
||||
import Bull from 'bull'
|
||||
import { type Registry, Counter, Summary, Gauge } from 'prom-client'
|
||||
import { ObserveMetrics } from '@/modules/previews/domain/operations'
|
||||
|
||||
export const PreviewJobDurationStep = {
|
||||
TOTAL: 'total',
|
||||
@@ -139,3 +140,28 @@ export const initializeMetrics = (params: {
|
||||
|
||||
return { previewJobsProcessedSummary }
|
||||
}
|
||||
|
||||
export const observeMetricsFactory = (deps: {
|
||||
summary: Summary<'step' | 'status'>
|
||||
}): ObserveMetrics => {
|
||||
const { summary } = deps
|
||||
return (params) => {
|
||||
const { payload } = params
|
||||
summary.observe(
|
||||
{ status: payload.status, step: PreviewJobDurationStep.TOTAL },
|
||||
payload.result.durationSeconds * TIME.second
|
||||
)
|
||||
if (payload.result.loadDurationSeconds) {
|
||||
summary.observe(
|
||||
{ status: payload.status, step: PreviewJobDurationStep.LOAD },
|
||||
payload.result.loadDurationSeconds * TIME.second
|
||||
)
|
||||
}
|
||||
if (payload.result.renderDurationSeconds) {
|
||||
summary.observe(
|
||||
{ status: payload.status, step: PreviewJobDurationStep.RENDER },
|
||||
payload.result.renderDurationSeconds * TIME.second
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import type { RequestObjectPreview } from '@/modules/previews/domain/operations'
|
||||
import type {
|
||||
BuildUpdateObjectPreview,
|
||||
RequestObjectPreview
|
||||
} from '@/modules/previews/domain/operations'
|
||||
import type { Logger } from '@/observability/logging'
|
||||
import type { Queue, Job } from 'bull'
|
||||
import type { EventEmitter } from 'stream'
|
||||
import { upsertObjectPreviewFactory } from '@/modules/previews/repository/previews'
|
||||
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
|
||||
import { PreviewStatus } from '@/modules/previews/domain/consts'
|
||||
import { JobPayload } from '@speckle/shared/workers/previews'
|
||||
import type { JobPayload } from '@speckle/shared/workers/previews'
|
||||
import { fromJobId, jobIdSchema } from '@speckle/shared/workers/previews'
|
||||
|
||||
export const requestObjectPreviewFactory =
|
||||
({
|
||||
@@ -20,27 +21,29 @@ export const requestObjectPreviewFactory =
|
||||
await queue.add(payload, { removeOnComplete: true, attempts: 3 })
|
||||
}
|
||||
|
||||
interface QueueEventEmitter extends EventEmitter {}
|
||||
|
||||
export const addRequestQueueListeners = (params: {
|
||||
logger: Logger
|
||||
previewRequestQueue: QueueEventEmitter
|
||||
}) => {
|
||||
const { logger, previewRequestQueue } = params
|
||||
|
||||
const requestErrorHandler = (err: Error) => {
|
||||
logger.error({ err }, 'Preview generation failed')
|
||||
export const requestErrorHandlerFactory =
|
||||
(deps: { logger: Logger }) => (err: Error) => {
|
||||
deps.logger.error(
|
||||
{ err },
|
||||
'Preview generation resulted in an error due to the Redis backend.'
|
||||
)
|
||||
}
|
||||
previewRequestQueue.removeListener('error', requestErrorHandler)
|
||||
previewRequestQueue.on('error', requestErrorHandler)
|
||||
|
||||
const requestFailedHandler = async (job: Job, err: Error) => {
|
||||
export const requestActiveHandlerFactory = (deps: { logger: Logger }) => (job: Job) => {
|
||||
const jobId = 'jobId' in job.data ? job.data.jobId : undefined
|
||||
deps.logger.info({ jobId }, 'Preview job {jobId} processing started.')
|
||||
}
|
||||
|
||||
export const requestFailedHandlerFactory =
|
||||
(deps: { logger: Logger; buildUpdateObjectPreview: BuildUpdateObjectPreview }) =>
|
||||
async (job: Job, err: Error) => {
|
||||
const { logger, buildUpdateObjectPreview } = deps
|
||||
const jobId = 'jobId' in job.data ? job.data.jobId : undefined
|
||||
logger.error({ err, jobId }, 'Preview job {jobId} failed.')
|
||||
if (!jobId) return
|
||||
const [projectId, objectId] = jobId.split('.')
|
||||
const projectDb = await getProjectDbClient({ projectId })
|
||||
await upsertObjectPreviewFactory({ db: projectDb })({
|
||||
const { projectId, objectId } = fromJobId(jobId)
|
||||
const updateObjectPreview = await buildUpdateObjectPreview({ projectId })
|
||||
const updatedRecords = await updateObjectPreview({
|
||||
objectPreview: {
|
||||
streamId: projectId,
|
||||
objectId,
|
||||
@@ -48,14 +51,16 @@ export const addRequestQueueListeners = (params: {
|
||||
lastUpdate: new Date()
|
||||
}
|
||||
})
|
||||
if (updatedRecords.length < 1) {
|
||||
logger.warn(
|
||||
{ projectId, objectId },
|
||||
'No object preview was updated for {projectId}.{objectId} after a failed preview job. This may indicate that the object preview does not exist or that the project does not exist, or has moved regions.'
|
||||
)
|
||||
}
|
||||
if (updatedRecords.length > 1) {
|
||||
logger.warn(
|
||||
{ projectId, objectId, updatedRecords },
|
||||
'Multiple object previews were updated for {projectId}.{objectId} after a failed preview job. This may indicate a data integrity issue.'
|
||||
)
|
||||
}
|
||||
}
|
||||
previewRequestQueue.removeListener('failed', requestFailedHandler)
|
||||
previewRequestQueue.on('failed', requestFailedHandler)
|
||||
|
||||
const requestActiveHandler = (job: Job) => {
|
||||
const jobId = 'jobId' in job.data ? job.data.jobId : undefined
|
||||
logger.info({ jobId }, 'Preview job {jobId} processing started.')
|
||||
}
|
||||
previewRequestQueue.removeListener('active', requestActiveHandler)
|
||||
previewRequestQueue.on('active', requestActiveHandler)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
GetPreviewImage,
|
||||
StoreObjectPreview,
|
||||
StorePreview,
|
||||
UpsertObjectPreview
|
||||
UpdateObjectPreview
|
||||
} from '@/modules/previews/domain/operations'
|
||||
import {
|
||||
ObjectPreview as ObjectPreviewRecord,
|
||||
@@ -67,14 +67,17 @@ export const storePreviewFactory =
|
||||
await tables.previews(db).insert(preview).onConflict().ignore()
|
||||
}
|
||||
|
||||
export const upsertObjectPreviewFactory =
|
||||
({ db }: { db: Knex }): UpsertObjectPreview =>
|
||||
export const updateObjectPreviewFactory =
|
||||
({ db }: { db: Knex }): UpdateObjectPreview =>
|
||||
async ({ objectPreview }) => {
|
||||
await tables
|
||||
return await tables
|
||||
.objectPreview(db)
|
||||
.insert(objectPreview)
|
||||
.onConflict(['streamId', 'objectId'])
|
||||
.merge()
|
||||
.where({
|
||||
streamId: objectPreview.streamId,
|
||||
objectId: objectPreview.objectId
|
||||
})
|
||||
.update(objectPreview)
|
||||
.returning<ObjectPreviewRecord[]>('*')
|
||||
}
|
||||
|
||||
export const getPreviewImageFactory =
|
||||
|
||||
@@ -1,44 +1,58 @@
|
||||
import { ProjectSubscriptions } from '@/modules/shared/utils/subscriptions'
|
||||
import { publish } from '@/modules/shared/utils/subscriptions'
|
||||
import { PreviewResultPayload } from '@speckle/shared/workers/previews'
|
||||
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
|
||||
import { throwUncoveredError } from '@speckle/shared'
|
||||
import type { Logger } from '@/observability/logging'
|
||||
import crypto from 'crypto'
|
||||
import { StorePreview, UpsertObjectPreview } from '@/modules/previews/domain/operations'
|
||||
import {
|
||||
BuildConsumePreviewResult,
|
||||
ConsumePreviewResult,
|
||||
StorePreview,
|
||||
UpdateObjectPreview
|
||||
} from '@/modules/previews/domain/operations'
|
||||
import { joinImages } from 'join-images'
|
||||
import { GetObjectCommitsWithStreamIds } from '@/modules/core/domain/commits/operations'
|
||||
import { PreviewPriority, PreviewStatus } from '@/modules/previews/domain/consts'
|
||||
import {
|
||||
storePreviewFactory,
|
||||
updateObjectPreviewFactory
|
||||
} from '@/modules/previews/repository/previews'
|
||||
import { getObjectCommitsWithStreamIdsFactory } from '@/modules/core/repositories/commits'
|
||||
|
||||
export const buildConsumePreviewResult: BuildConsumePreviewResult = async (deps) => {
|
||||
const { logger, projectId } = deps
|
||||
const projectDb = await getProjectDbClient({ projectId })
|
||||
return consumePreviewResultFactory({
|
||||
logger,
|
||||
storePreview: storePreviewFactory({ db: projectDb }),
|
||||
updateObjectPreview: updateObjectPreviewFactory({ db: projectDb }),
|
||||
getObjectCommitsWithStreamIds: getObjectCommitsWithStreamIdsFactory({
|
||||
db: projectDb
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export const consumePreviewResultFactory =
|
||||
({
|
||||
logger,
|
||||
upsertObjectPreview,
|
||||
updateObjectPreview,
|
||||
storePreview,
|
||||
getObjectCommitsWithStreamIds
|
||||
}: {
|
||||
logger: Logger
|
||||
upsertObjectPreview: UpsertObjectPreview
|
||||
updateObjectPreview: UpdateObjectPreview
|
||||
storePreview: StorePreview
|
||||
getObjectCommitsWithStreamIds: GetObjectCommitsWithStreamIds
|
||||
}) =>
|
||||
async ({
|
||||
projectId,
|
||||
objectId,
|
||||
previewResult
|
||||
}: {
|
||||
projectId: string
|
||||
objectId: string
|
||||
previewResult: PreviewResultPayload
|
||||
}) => {
|
||||
const streamId = projectId
|
||||
}): ConsumePreviewResult =>
|
||||
async ({ projectId, objectId, previewResult }) => {
|
||||
const lastUpdate = new Date()
|
||||
const priority = PreviewPriority.LOW
|
||||
const log = logger.child({
|
||||
jobId: previewResult.jobId,
|
||||
status: previewResult.status,
|
||||
durationSeconds: previewResult.result.durationSeconds,
|
||||
projectId: streamId,
|
||||
streamId, // for legacy reasons
|
||||
projectId,
|
||||
streamId: projectId, // for legacy reasons
|
||||
objectId
|
||||
})
|
||||
|
||||
@@ -48,10 +62,10 @@ export const consumePreviewResultFactory =
|
||||
switch (previewResult.status) {
|
||||
case 'error':
|
||||
log.warn({ reason: previewResult.reason }, previewMessage)
|
||||
await upsertObjectPreview({
|
||||
await updateObjectPreview({
|
||||
objectPreview: {
|
||||
objectId,
|
||||
streamId,
|
||||
streamId: projectId,
|
||||
lastUpdate,
|
||||
preview: { err: previewResult.reason },
|
||||
priority,
|
||||
@@ -97,10 +111,10 @@ export const consumePreviewResultFactory =
|
||||
|
||||
preview['all'] = fullImgId
|
||||
|
||||
await upsertObjectPreview({
|
||||
await updateObjectPreview({
|
||||
objectPreview: {
|
||||
objectId,
|
||||
streamId,
|
||||
streamId: projectId,
|
||||
lastUpdate,
|
||||
preview,
|
||||
priority,
|
||||
@@ -108,7 +122,7 @@ export const consumePreviewResultFactory =
|
||||
}
|
||||
})
|
||||
const commits = await getObjectCommitsWithStreamIds([objectId], {
|
||||
streamIds: [streamId]
|
||||
streamIds: [projectId]
|
||||
})
|
||||
if (!commits.length) break
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
} from '@/modules/previews/domain/operations'
|
||||
import { Roles, Scopes, TIME_MS } from '@speckle/shared'
|
||||
import { TokenResourceIdentifierType } from '@/modules/core/domain/tokens/types'
|
||||
import { toJobId } from '@speckle/shared/workers/previews'
|
||||
|
||||
export const createObjectPreviewFactory =
|
||||
({
|
||||
@@ -39,10 +40,12 @@ export const createObjectPreviewFactory =
|
||||
return false
|
||||
}
|
||||
|
||||
const jobId = toJobId({ projectId: streamId, objectId })
|
||||
|
||||
// we're running the preview generation in the name of a project owner
|
||||
const token = await createAppToken({
|
||||
appId: DefaultAppIds.Web,
|
||||
name: `preview-${streamId}@${objectId}`,
|
||||
name: `preview-${jobId}`,
|
||||
userId,
|
||||
scopes: [Scopes.Streams.Read],
|
||||
lifespan: 2 * TIME_MS.hour,
|
||||
@@ -58,6 +61,10 @@ export const createObjectPreviewFactory =
|
||||
serverOrigin
|
||||
).toString()
|
||||
|
||||
await requestObjectPreview({ jobId: `${streamId}.${objectId}`, token, url })
|
||||
await requestObjectPreview({
|
||||
jobId,
|
||||
token,
|
||||
url
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
import { ensureError } from '@speckle/shared'
|
||||
import type { Logger } from '@/observability/logging'
|
||||
import type { DoneCallback, Job } from 'bull'
|
||||
import type {
|
||||
BuildConsumePreviewResult,
|
||||
ObserveMetrics
|
||||
} from '@/modules/previews/domain/operations'
|
||||
import { StreamNotFoundError } from '@/modules/core/errors/stream'
|
||||
import { fromJobId, previewResultPayload } from '@speckle/shared/workers/previews'
|
||||
|
||||
const parseMessage = (data: string) =>
|
||||
previewResultPayload
|
||||
.transform((data) => {
|
||||
const { projectId, objectId } = fromJobId(data.jobId)
|
||||
return {
|
||||
...data,
|
||||
projectId,
|
||||
objectId
|
||||
}
|
||||
})
|
||||
.safeParse(data)
|
||||
|
||||
export const responseHandlerFactory = (deps: {
|
||||
consumePreviewResultBuilder: BuildConsumePreviewResult
|
||||
observeMetrics: ObserveMetrics
|
||||
logger: Logger
|
||||
}) => {
|
||||
const { observeMetrics, logger, consumePreviewResultBuilder } = deps
|
||||
return async (payload: Pick<Job, 'attemptsMade' | 'data'>, done: DoneCallback) => {
|
||||
const { attemptsMade } = payload
|
||||
const parsedMessage = parseMessage(payload.data)
|
||||
if (!parsedMessage.success) {
|
||||
logger.error(
|
||||
{ payload: payload.data, reason: parsedMessage.error },
|
||||
'Failed to parse previewResult payload'
|
||||
)
|
||||
|
||||
// as we can't parse the response we neither have a job ID nor a duration,
|
||||
// we cannot get a duration to populate previewJobsProcessedSummary.observe
|
||||
|
||||
done(parsedMessage.error)
|
||||
return
|
||||
}
|
||||
|
||||
const parsedPayload = parsedMessage.data
|
||||
const { projectId, objectId } = parsedPayload
|
||||
const jobLogger = logger.child({
|
||||
projectId,
|
||||
objectId,
|
||||
responsePriorAttemptsMade: attemptsMade
|
||||
})
|
||||
|
||||
try {
|
||||
observeMetrics({ payload: parsedPayload })
|
||||
|
||||
const consumePreviewResult = await consumePreviewResultBuilder({
|
||||
logger: jobLogger,
|
||||
projectId
|
||||
})
|
||||
|
||||
await consumePreviewResult({
|
||||
projectId,
|
||||
objectId,
|
||||
previewResult: parsedPayload
|
||||
})
|
||||
} catch (e) {
|
||||
const err = ensureError(e, 'Unknown error when consuming preview result')
|
||||
|
||||
if (err instanceof StreamNotFoundError) {
|
||||
jobLogger.warn(
|
||||
{ err },
|
||||
'Failed to consume preview result; the stream does not exist. Probably deleted while preview was generated.'
|
||||
)
|
||||
done() // don't pass the error to done, as we don't want to retry the job
|
||||
return
|
||||
}
|
||||
|
||||
jobLogger.error({ err }, 'Failed to consume preview result')
|
||||
done(err)
|
||||
return
|
||||
}
|
||||
|
||||
done()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
import { expect } from 'chai'
|
||||
import { responseHandlerFactory } from '@/modules/previews/services/responses'
|
||||
import { testLogger as logger } from '@/observability/logging'
|
||||
import { buildConsumePreviewResult } from '@/modules/previews/resultListener'
|
||||
import cryptoRandomString from 'crypto-random-string'
|
||||
|
||||
describe('object preview @previews', () => {
|
||||
describe('responseHandlerFactory creates a function, that', () => {
|
||||
beforeEach(() => {})
|
||||
it('gracefully handles a response for a non-existent (e.g. deleted in meantime) project', async () => {
|
||||
const handleResponse = responseHandlerFactory({
|
||||
observeMetrics: () => {},
|
||||
logger,
|
||||
consumePreviewResultBuilder: buildConsumePreviewResult
|
||||
})
|
||||
let doneCalled = false
|
||||
let doneErr: Error | null | undefined = null
|
||||
await expect(
|
||||
handleResponse(
|
||||
{
|
||||
data: {
|
||||
jobId: `${cryptoRandomString({ length: 10 })}.${cryptoRandomString({
|
||||
length: 10
|
||||
})}`,
|
||||
status: 'success',
|
||||
result: {
|
||||
durationSeconds: 0,
|
||||
loadDurationSeconds: 0,
|
||||
renderDurationSeconds: 0,
|
||||
screenshots: {
|
||||
'0': 'data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAgAAAAIAQMAAAD+wSzIAAAABlBMVEX///+/v7+jQ3Y5AAAADklEQVQI12P4AIX8EAgALgAD/aNpbtEAAAAASUVORK5CYII'
|
||||
}
|
||||
}
|
||||
},
|
||||
attemptsMade: 0
|
||||
},
|
||||
(err) => {
|
||||
doneCalled = true
|
||||
doneErr = err
|
||||
}
|
||||
)
|
||||
).eventually.be.fulfilled
|
||||
expect(doneCalled).to.be.true
|
||||
expect(doneErr).to.be.undefined
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -497,3 +497,7 @@ export const isRateLimiterEnabled = (): boolean => {
|
||||
export const getFileUploadUrlExpiryMinutes = (): number => {
|
||||
return getIntFromEnv('FILE_UPLOAD_URL_EXPIRY_MINUTES', '1440')
|
||||
}
|
||||
|
||||
export const getPreviewServiceTimeoutMilliseconds = (): number => {
|
||||
return getIntFromEnv('PREVIEW_SERVICE_TIMEOUT_MILLISECONDS', '3600000') // 1 hour
|
||||
}
|
||||
|
||||
@@ -1,7 +1,29 @@
|
||||
import z from 'zod'
|
||||
|
||||
const JobIdSeparator = '.'
|
||||
|
||||
export const jobIdSchema = z
|
||||
.string()
|
||||
.refine((data) => data.split(JobIdSeparator).length === 2, {
|
||||
message: 'jobId must be in the format "projectId.objectId"'
|
||||
})
|
||||
|
||||
export const toJobId = (params: { projectId: string; objectId: string }): string => {
|
||||
return `${params.projectId}${JobIdSeparator}${params.objectId}`
|
||||
}
|
||||
export const fromJobId = (jobId: string): { projectId: string; objectId: string } => {
|
||||
return jobIdSchema
|
||||
.transform((data) => {
|
||||
return {
|
||||
projectId: data.split(JobIdSeparator)[0],
|
||||
objectId: data.split(JobIdSeparator)[1]
|
||||
}
|
||||
})
|
||||
.parse(jobId)
|
||||
}
|
||||
|
||||
const job = z.object({
|
||||
jobId: z.string()
|
||||
jobId: jobIdSchema
|
||||
})
|
||||
|
||||
export const jobPayload = job.merge(
|
||||
|
||||
@@ -761,6 +761,10 @@ Generate the environment variables for Speckle server and Speckle objects deploy
|
||||
{{- if .Values.preview_service.enabled }}
|
||||
- name: PREVIEW_SERVICE_USE_PRIVATE_OBJECTS_SERVER_URL
|
||||
value: "true"
|
||||
{{- if .Values.preview_service.puppeteer.timeoutMilliseconds }}
|
||||
- name: PREVIEW_SERVICE_TIMEOUT_MILLISECONDS
|
||||
value: {{ .Values.preview_service.puppeteer.timeoutMilliseconds | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
# *** Redis ***
|
||||
|
||||
Reference in New Issue
Block a user