diff --git a/packages/server/modules/multiregion/domain/operations.ts b/packages/server/modules/multiregion/domain/operations.ts index cd79f6d88..8a0c0d368 100644 --- a/packages/server/modules/multiregion/domain/operations.ts +++ b/packages/server/modules/multiregion/domain/operations.ts @@ -9,6 +9,7 @@ import type { InsertableRegionRecord } from '@/modules/multiregion/helpers/types import type { Optional } from '@speckle/shared' import type { ObjectStorage } from '@/modules/blobstorage/clients/objectStorage' import type { Stream } from '@/modules/core/domain/streams/types' +import type { MultiregionJob } from '@/modules/multiregion/services/queue' export type GetRegions = () => Promise export type GetRegion = (params: { key: string }) => Promise> @@ -69,3 +70,5 @@ export type GetProjectObjectStorage = (args: { export type GetRegionObjectStorage = (args: { regionKey: string }) => Promise<{ private: ObjectStorage; public: ObjectStorage }> + +export type ScheduleMultiregionJob = (args: MultiregionJob) => Promise diff --git a/packages/server/modules/multiregion/graph/resolvers/index.ts b/packages/server/modules/multiregion/graph/resolvers/index.ts index 04c7e1f84..5c438f936 100644 --- a/packages/server/modules/multiregion/graph/resolvers/index.ts +++ b/packages/server/modules/multiregion/graph/resolvers/index.ts @@ -19,6 +19,7 @@ import { } from '@/modules/multiregion/services/management' import { initializeRegion as initializeBlobStorage } from '@/modules/multiregion/utils/blobStorageSelector' import { withOperationLogging } from '@/observability/domain/businessLogging' +import { scheduleJob } from '@/modules/multiregion/services/queue' export default { ServerMultiRegionConfiguration: { @@ -53,7 +54,8 @@ export default { initializeRegion: initializeRegionClients({ initializeDb, initializeBlobStorage - }) + }), + scheduleJob }) return await withOperationLogging( async () => await createAndValidateNewRegion({ region: args.input }), diff --git a/packages/server/modules/multiregion/services/management.ts b/packages/server/modules/multiregion/services/management.ts index 39426aef6..cd0bff827 100644 --- a/packages/server/modules/multiregion/services/management.ts +++ b/packages/server/modules/multiregion/services/management.ts @@ -3,6 +3,7 @@ import type { GetFreeRegionKeys, GetRegion, InitializeRegion, + ScheduleMultiregionJob, StoreRegion, UpdateAndValidateRegion, UpdateRegion @@ -19,12 +20,14 @@ export const createAndValidateNewRegionFactory = getFreeRegionKeys, getRegion, initializeRegion, - storeRegion + storeRegion, + scheduleJob }: { getFreeRegionKeys: GetFreeRegionKeys getRegion: GetRegion storeRegion: StoreRegion initializeRegion: InitializeRegion + scheduleJob: ScheduleMultiregionJob }): CreateAndValidateNewRegion => async ({ region }) => { const [existingRegion, freeKeys] = await Promise.all([ @@ -45,7 +48,16 @@ export const createAndValidateNewRegionFactory = await initializeRegion({ regionKey: region.key }) - return await storeRegion({ region }) + const record = await storeRegion({ region }) + + await scheduleJob({ + type: 'initialize-region-data', + payload: { + regionKey: region.key + } + }) + + return record } export const updateAndValidateRegionFactory = diff --git a/packages/server/modules/multiregion/services/queue.ts b/packages/server/modules/multiregion/services/queue.ts index c32b56c79..e62c26129 100644 --- a/packages/server/modules/multiregion/services/queue.ts +++ b/packages/server/modules/multiregion/services/queue.ts @@ -22,7 +22,7 @@ import { moveProjectToRegionFactory, validateProjectRegionCopyFactory } from '@/modules/workspaces/services/projectRegions' -import { db } from '@/db/knex' +import { db, mainDb } from '@/db/knex' import { deleteProjectFactory, getProjectFactory @@ -64,6 +64,19 @@ import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/com import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' import { getProjectRegionKey } from '@/modules/multiregion/utils/regionSelector' +import { + copyAllUsersAcrossRegionsFactory, + copyAllWorkspacesAcrossRegionsFactory +} from '@/modules/multiregion/tasks/regionSync' +import { + bulkUpsertUsersFactory, + getAllUsersFactory +} from '@/modules/core/repositories/users' +import { + bulkUpsertWorkspacesFactory, + getAllWorkspacesFactory +} from '@/modules/workspaces/repositories/workspaces' +import type { ScheduleMultiregionJob } from '@/modules/multiregion/domain/operations' const MULTIREGION_QUEUE_NAME = isTestEnv() ? `test:multiregion:${cryptoRandomString({ length: 5 })}` @@ -74,7 +87,7 @@ if (isTestEnv()) { logger.info(`Monitor using: 'yarn cli bull monitor ${MULTIREGION_QUEUE_NAME}'`) } -type MultiregionJob = +export type MultiregionJob = | { type: 'move-project-region' payload: { @@ -89,6 +102,12 @@ type MultiregionJob = regionKey: string } } + | { + type: 'initialize-region-data' + payload: { + regionKey: string + } + } let queue: Optional> @@ -132,7 +151,7 @@ export const initializeQueue = async () => { /** * Add a job to the multiregion job queue. */ -export const scheduleJob = async (jobData: MultiregionJob): Promise => { +export const scheduleJob: ScheduleMultiregionJob = async (jobData) => { const queue = getQueue() const job = await queue.add(jobData) return job.id.toString() @@ -141,7 +160,8 @@ export const scheduleJob = async (jobData: MultiregionJob): Promise => { const isMultiregionJob = (job: Bull.Job): job is Bull.Job => { const jobTypes: MultiregionJob['type'][] = [ 'move-project-region', - 'delete-project-region-data' + 'delete-project-region-data', + 'initialize-region-data' ] return !!job.data.type && jobTypes.includes(job.data.type) } @@ -151,6 +171,7 @@ const isMultiregionJob = (job: Bull.Job): job is Bull.Job => { */ export const startQueue = async () => { const queue = getQueue() + void queue.process(async (job) => { if (!isMultiregionJob(job)) { throw new MultiRegionInvalidJobError() @@ -167,6 +188,48 @@ export const startQueue = async () => { ) switch (job.data.type) { + case 'initialize-region-data': + const regionDb = await getRegionDb({ regionKey: job.data.payload.regionKey }) + + if (!regionDb) + throw new MultiRegionInvalidJobError('New target region not found') + + logger.info( + { + jobId: job.id, + jobQueue: MULTIREGION_QUEUE_NAME, + payload: job.data.payload, + type: job.data.type, + regionKey: job.data.payload.regionKey + }, + 'Copying user data to new region' + ) + + await copyAllUsersAcrossRegionsFactory({ + getAllUsers: getAllUsersFactory({ db: mainDb }), + bulkUpsertUsers: bulkUpsertUsersFactory({ db: regionDb }) + })({ + logger + }) + + logger.info( + { + jobId: job.id, + jobQueue: MULTIREGION_QUEUE_NAME, + payload: job.data.payload, + type: job.data.type, + regionKey: job.data.payload.regionKey + }, + 'Copying workspace data to new region' + ) + + await copyAllWorkspacesAcrossRegionsFactory({ + getAllWorkspaces: getAllWorkspacesFactory({ db: mainDb }), + bulkUpsertWorkspaces: bulkUpsertWorkspacesFactory({ db: regionDb }) + })({ + logger + }) + break case 'move-project-region': { const { projectId, regionKey: targetRegionKey } = job.data.payload @@ -317,18 +380,20 @@ export const startQueue = async () => { throw new MultiRegionNotYetImplementedError() } }) + void queue.on('completed', (job) => { - const { projectId, regionKey } = job.data.payload + const { regionKey } = job.data.payload logger.info( { jobId: job.id, jobQueue: MULTIREGION_QUEUE_NAME, - projectId, + payload: job.data.payload, regionKey }, 'Completed multiregion job {jobId}' ) }) + void queue.on('failed', (job, err) => { logger.error( { @@ -340,6 +405,7 @@ export const startQueue = async () => { 'Failed to process multiregion job {jobId}' ) }) + void queue.on('error', (err) => { logger.error( { diff --git a/packages/server/test/hooks.ts b/packages/server/test/hooks.ts index f6b88ca8d..6e2ac50ad 100644 --- a/packages/server/test/hooks.ts +++ b/packages/server/test/hooks.ts @@ -121,7 +121,9 @@ const setupDatabases = async () => { }), getRegion: getRegionFactory({ db }), storeRegion: storeRegionFactory({ db }), - initializeRegion + initializeRegion, + // As db starts from scratch, no need to sync regions + scheduleJob: () => Promise.resolve('') }) for (const regionKey of regionKeys) { await createRegion({