Files
speckle-server/packages/server/modules/multiregion/services/queue.ts
T
Daniel Gak Anagrov 399c998fd7 feat(multiregion): apply prepared transactions to projects (#5322)
* feat(multiregion): replace user replication

* chore(multiregion): optimise replication

* maybe it's this

* postgres is fun

* once more

* chore(multiregion): only replicate test user creation during multiregion tests

* feat: improved replicate_query logic

* fix: minor

* fix: starting issue

* feat: included user create and delete specs to multiregion

* feat: removed console logs

* fix: user defaults

* fix: multiregion test helper

* fix: update scenarios for users

* refactor(multiregion): swap replicateQuery concept to asMultiregionOperation (#5301)

feat(multiregion): introduced asMultregionOperator, refactor test to user builder classes

* chore: renamings

* fix: remove comments

* feat: remove user replication

* refactor: simplified spec usages

* chore: comments

* chore: branches and favs

* chore: more tests

* chore: more tests

* fix linting

* fix tests

* feat: dropping replication

* refactor: moved project delete to service

* fix: comment

* feat: updateStreamFactory and updateProjectFacotry

* deleteProjectFactory + replicateFactory

* deleteWorkspaceFactory

* fix: selector

* fix: tests

* fix tests, finished createStreamFactory

* feat: simplify changes

* fix: remove comment

* fix: minor strucutres

* fix: moveProjectToRegion

* fix: moved branch creation outside of multiregion scope

* fix: branch creation

* fix: tests

* fix: ci tests

* fix: removed log form test

* fix: on specs, no random regionKeys

* review fixes

* fix: mr comments

* feat: removed test

---------

Co-authored-by: Charles Driesler <chuck@speckle.systems>
2025-09-04 13:07:19 +02:00

325 lines
10 KiB
TypeScript

import type Bull from 'bull'
import { logger } from '@/observability/logging'
import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import cryptoRandomString from 'crypto-random-string'
import type { Optional } from '@speckle/shared'
import { TIME_MS } from '@speckle/shared'
import { UninitializedResourceAccessError } from '@/modules/shared/errors'
import {
MultiRegionInvalidJobError,
MultiRegionNotYetImplementedError
} from '@/modules/multiregion/errors'
import {
getProjectDbClient,
getRegionDb,
getReplicationDbs
} from '@/modules/multiregion/utils/dbSelector'
import {
getProjectObjectStorage,
getRegionObjectStorage
} from '@/modules/multiregion/utils/blobStorageSelector'
import {
moveProjectToRegionFactory,
validateProjectRegionCopyFactory
} from '@/modules/workspaces/services/projectRegions'
import { db } from '@/db/knex'
import {
getProjectFactory,
storeProjectRolesFactory
} from '@/modules/core/repositories/projects'
import { getAvailableRegionsFactory } from '@/modules/workspaces/services/regions'
import { getRegionsFactory } from '@/modules/multiregion/repositories'
import { canWorkspaceUseRegionsFactory } from '@/modules/gatekeeper/services/featureAuthorization'
import { getWorkspacePlanFactory } from '@/modules/gatekeeper/repositories/billing'
import {
upsertProjectRegionKeyFactory,
deleteRegionKeyFromCacheFactory
} from '@/modules/multiregion/repositories/projectRegion'
import { updateProjectRegionKeyFactory } from '@/modules/multiregion/services/projectRegion'
import { getGenericRedis } from '@/modules/shared/redis/redis'
import { initializeQueue as setupQueue } from '@speckle/shared/queue'
import {
copyWorkspaceFactory,
copyProjectsFactory,
copyProjectModelsFactory,
copyProjectVersionsFactory,
copyProjectObjectsFactory,
copyProjectAutomationsFactory,
copyProjectCommentsFactory,
copyProjectWebhooksFactory,
copyProjectBlobs,
countProjectModelsFactory,
countProjectVersionsFactory,
countProjectObjectsFactory,
countProjectAutomationsFactory,
countProjectCommentsFactory,
countProjectWebhooksFactory
} from '@/modules/workspaces/repositories/projectRegions'
import { withTransaction } from '@/modules/shared/helpers/dbHelper'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import { chunk } from 'lodash-es'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
const MULTIREGION_QUEUE_NAME = isTestEnv()
? `test:multiregion:${cryptoRandomString({ length: 5 })}`
: 'default:multiregion'
if (isTestEnv()) {
logger.info(`Multiregion test queue ID: ${MULTIREGION_QUEUE_NAME}`)
logger.info(`Monitor using: 'yarn cli bull monitor ${MULTIREGION_QUEUE_NAME}'`)
}
type MultiregionJob =
| {
type: 'move-project-region'
payload: {
projectId: string
regionKey: string
}
}
| {
type: 'delete-project-region-data'
payload: {
projectId: string
regionKey: string
}
}
let queue: Optional<Bull.Queue<MultiregionJob>>
export const getQueue = (): Bull.Queue<MultiregionJob> => {
if (!queue) {
throw new UninitializedResourceAccessError(
'Attempting to use uninitialized Bull queue'
)
}
return queue
}
export const initializeQueue = async () => {
queue = await setupQueue({
queueName: MULTIREGION_QUEUE_NAME,
redisUrl: getRedisUrl(),
options: {
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
}
}
: {}),
defaultJobOptions: {
attempts: 5,
timeout: 15 * TIME_MS.minute,
backoff: {
type: 'fixed',
delay: 5 * TIME_MS.minute
},
removeOnComplete: isProdEnv(),
removeOnFail: false
}
}
})
}
/**
* Add a job to the multiregion job queue.
*/
export const scheduleJob = async (jobData: MultiregionJob): Promise<string> => {
const queue = getQueue()
const job = await queue.add(jobData)
return job.id.toString()
}
const isMultiregionJob = (job: Bull.Job): job is Bull.Job<MultiregionJob> => {
const jobTypes: MultiregionJob['type'][] = [
'move-project-region',
'delete-project-region-data'
]
return !!job.data.type && jobTypes.includes(job.data.type)
}
/**
* Start processing jobs in queue in current process.
*/
export const startQueue = async () => {
const queue = getQueue()
void queue.process(async (job) => {
if (!isMultiregionJob(job)) {
throw new MultiRegionInvalidJobError()
}
logger.info(
{
jobId: job.id,
jobQueue: MULTIREGION_QUEUE_NAME,
payload: job.data.payload,
type: job.data.type
},
'Processing multiregion job {jobId}'
)
switch (job.data.type) {
case 'move-project-region': {
const { projectId, regionKey } = job.data.payload
const sourceDb = await getProjectDbClient({ projectId })
const sourceObjectStorage = (await getProjectObjectStorage({ projectId }))
.private
const targetDb = await getRegionDb({ regionKey })
const targetObjectStorage = (await getRegionObjectStorage({ regionKey }))
.private
// Move project to target region
await withTransaction(
async ({ db: targetDbTrx }) => {
const moveProjectToRegion = moveProjectToRegionFactory({
getProject: getProjectFactory({ db: sourceDb }),
getAvailableRegions: getAvailableRegionsFactory({
getRegions: getRegionsFactory({ db }),
canWorkspaceUseRegions: canWorkspaceUseRegionsFactory({
getWorkspacePlan: getWorkspacePlanFactory({ db })
})
}),
copyWorkspace: copyWorkspaceFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjects: copyProjectsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectModels: copyProjectModelsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectVersions: copyProjectVersionsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectObjects: copyProjectObjectsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectAutomations: copyProjectAutomationsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectComments: copyProjectCommentsFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectWebhooks: copyProjectWebhooksFactory({
sourceDb,
targetDb: targetDbTrx
}),
copyProjectBlobs: copyProjectBlobs({
sourceDb,
sourceObjectStorage,
targetDb: targetDbTrx,
targetObjectStorage
}),
validateProjectRegionCopy: validateProjectRegionCopyFactory({
countProjectModels: countProjectModelsFactory({ db: sourceDb }),
countProjectVersions: countProjectVersionsFactory({ db: sourceDb }),
countProjectObjects: countProjectObjectsFactory({ db: sourceDb }),
countProjectAutomations: countProjectAutomationsFactory({
db: sourceDb
}),
countProjectComments: countProjectCommentsFactory({ db: sourceDb }),
countProjectWebhooks: countProjectWebhooksFactory({ db: sourceDb })
})
})
await moveProjectToRegion({ projectId, regionKey })
},
{
db: targetDb
}
)
// Update project region in dbs and update relevant caches
const project = await asMultiregionalOperation(
async ({ allDbs, emit }) =>
updateProjectRegionKeyFactory({
upsertProjectRegionKey: replicateFactory(
allDbs,
upsertProjectRegionKeyFactory
),
cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({
redis: getGenericRedis()
}),
emitEvent: emit
})({ projectId, regionKey }),
{
name: 'updateProjectRegion',
description: 'Update project region in db and update relevant caches',
logger,
dbs: await getReplicationDbs({ regionKey })
}
)
// Grab project roles for later reinstating
const projectRoles = await getStreamCollaboratorsFactory({ db })(project.id)
// Reinstate project acl records
for (const roles of chunk(projectRoles, 10_000)) {
await storeProjectRolesFactory({ db })({
roles: roles.map((role) => ({
projectId: project.id,
userId: role.id,
role: role.streamRole
}))
})
}
return
}
case 'delete-project-region-data':
default:
throw new MultiRegionNotYetImplementedError()
}
})
void queue.on('completed', (job) => {
const { projectId, regionKey } = job.data.payload
logger.info(
{
jobId: job.id,
jobQueue: MULTIREGION_QUEUE_NAME,
projectId,
regionKey
},
'Completed multiregion job {jobId}'
)
})
void queue.on('failed', (job, err) => {
logger.error(
{
jobId: job.id,
jobQueue: MULTIREGION_QUEUE_NAME,
error: err,
errorMessage: err.message
},
'Failed to process multiregion job {jobId}'
)
})
void queue.on('error', (err) => {
logger.error(
{
jobQueue: MULTIREGION_QUEUE_NAME,
error: err,
errorMessage: err.message
},
'Failed to process multiregion job'
)
})
}
export const shutdownQueue = async () => {
if (!queue) return
await queue.close()
}