fix(multiregion): make move region job safe for replication (#4907)

* chore(multiregion): failing test for move project region

* fix(multiregion): unblock replication after regional move

* fix(regions): try-catch replication

* fix(regions): cache roles within job

* fix(regions): toggle region key in target db
This commit is contained in:
Chuck Driesler
2025-06-25 15:47:34 +01:00
committed by GitHub
parent 2ef38a3962
commit 2f36e518a0
12 changed files with 273 additions and 158 deletions
@@ -58,7 +58,10 @@ import { authorizeResolver } from '@/modules/shared'
import { Roles } from '@speckle/shared'
import { getDefaultRegionFactory } from '@/modules/workspaces/repositories/regions'
import { getDb } from '@/modules/multiregion/utils/dbSelector'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import {
deleteProjectFactory,
getProjectFactory,
@@ -190,11 +193,13 @@ const command: CommandModule<
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
getProject: getProjectFactory({ db: projectDb }),
deleteProject: deleteProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db: projectDb }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})
@@ -14,6 +14,14 @@ export type StoreProjectRole = (args: {
role: StreamRoles
}) => Promise<void>
export type StoreProjectRoles = (args: {
roles: {
projectId: string
userId: string
role: StreamRoles
}[]
}) => Promise<void>
export type UpsertProjectRole = (
args: {
projectId: string
@@ -59,3 +67,9 @@ export type StoreModel = (params: {
projectId: string
authorId: string
}) => Promise<void>
export type WaitForRegionProject = (params: {
projectId: string
regionKey: string
maxAttempts?: number
}) => Promise<void>
@@ -48,7 +48,10 @@ import {
getUserStreamsCountFactory
} from '@/modules/core/repositories/streams'
import { getUserFactory, getUsersFactory } from '@/modules/core/repositories/users'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import { throwIfRateLimitedFactory } from '@/modules/core/utils/ratelimiter'
import {
addOrUpdateStreamCollaboratorFactory,
@@ -463,11 +466,13 @@ const resolvers: Resolvers = {
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})
@@ -3,7 +3,8 @@ import {
DeleteProject,
GetProject,
StoreProject,
StoreProjectRole
StoreProjectRole,
StoreProjectRoles
} from '@/modules/core/domain/projects/operations'
import { Project } from '@/modules/core/domain/streams/types'
import { StreamAclRecord } from '@/modules/core/helpers/types'
@@ -35,6 +36,18 @@ export const deleteProjectFactory =
export const storeProjectRoleFactory =
({ db }: { db: Knex }): StoreProjectRole =>
async ({ projectId, userId, role }) => {
await tables.projectAcl(db).insert({ resourceId: projectId, role, userId })
async (role) => {
await storeProjectRolesFactory({ db })({ roles: [role] })
}
export const storeProjectRolesFactory =
({ db }: { db: Knex }): StoreProjectRoles =>
async ({ roles }) => {
await tables.projectAcl(db).insert(
roles.map((role) => ({
resourceId: role.projectId,
userId: role.userId,
role: role.role
}))
)
}
@@ -1078,6 +1078,7 @@ export const updateStreamFactory =
return updatedStream
}
/** @deprecated Use `updateStreamFactory` */
export const updateProjectFactory =
({ db }: { db: Knex }): UpdateProject =>
async ({ projectUpdate }) => {
@@ -6,7 +6,8 @@ import {
GetProject,
StoreModel,
StoreProject,
StoreProjectRole
StoreProjectRole,
WaitForRegionProject
} from '@/modules/core/domain/projects/operations'
import { Project } from '@/modules/core/domain/streams/types'
import { RegionalProjectCreationError } from '@/modules/core/errors/projects'
@@ -22,18 +23,16 @@ import cryptoRandomString from 'crypto-random-string'
export const createNewProjectFactory =
({
storeProject,
getProject,
deleteProject,
storeProjectRole,
storeModel,
waitForRegionProject,
emitEvent
}: {
storeProject: StoreProject
getProject: GetProject
deleteProject: DeleteProject
storeProjectRole: StoreProjectRole
emitEvent: EventBusEmit
storeModel: StoreModel
waitForRegionProject: WaitForRegionProject
emitEvent: EventBusEmit
}): CreateProject =>
async ({ description, name, regionKey, visibility, workspaceId, ownerId }) => {
visibility =
@@ -57,25 +56,10 @@ export const createNewProjectFactory =
const projectId = project.id
// if regionKey, we need to make sure it is actually written and synced
if (regionKey) {
try {
await retry(
async () => {
const replicatedProject = await getProject({ projectId })
if (!replicatedProject) throw new StreamNotFoundError()
},
{ maxAttempts: 10, delay: isTestEnv() ? TIME_MS.second : undefined }
)
} catch (err) {
if (err instanceof StreamNotFoundError) {
// delete from region
await deleteProject({ projectId })
throw new RegionalProjectCreationError(undefined, {
info: { projectId, regionKey }
})
}
// else throw as is
throw err
}
await waitForRegionProject({
projectId,
regionKey
})
}
await storeProjectRole({ projectId, userId: ownerId, role: Roles.Stream.Owner })
await storeModel({
@@ -98,3 +82,30 @@ export const createNewProjectFactory =
})
return project
}
export const waitForRegionProjectFactory =
(deps: {
getProject: GetProject
deleteProject: DeleteProject
}): WaitForRegionProject =>
async ({ projectId, regionKey, maxAttempts = 10 }) => {
try {
await retry(
async () => {
const replicatedProject = await deps.getProject({ projectId })
if (!replicatedProject) throw new StreamNotFoundError()
},
{ maxAttempts, delay: isTestEnv() ? TIME_MS.second : undefined }
)
} catch (err) {
if (err instanceof StreamNotFoundError) {
// delete from region
await deps.deleteProject({ projectId })
throw new RegionalProjectCreationError(undefined, {
info: { projectId, regionKey }
})
}
// else throw as is
throw err
}
}
@@ -3,7 +3,10 @@ import { Project } from '@/modules/core/domain/streams/types'
import { RegionalProjectCreationError } from '@/modules/core/errors/projects'
import { StreamNotFoundError } from '@/modules/core/errors/stream'
import { ProjectRecordVisibility } from '@/modules/core/helpers/types'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import { isSpecificEventPayload } from '@/modules/shared/services/eventBus'
import { expectToThrow } from '@/test/assertionHelper'
import { Roles, StreamRoles } from '@speckle/shared'
@@ -19,14 +22,11 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async () => {
expect.fail()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
const project = await createNewProject({ ownerId })
@@ -45,14 +45,11 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async () => {
expect.fail()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
@@ -72,14 +69,11 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async () => {
expect.fail()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
@@ -97,14 +91,11 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async () => {
expect.fail()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
const project = await createNewProject({ ownerId, visibility: 'PRIVATE' })
@@ -113,78 +104,11 @@ describe('project services @core', () => {
expect(storedProject!.visibility).to.eq(ProjectRecordVisibility.Private)
expect(storedProject!.allowPublicComments).to.be.false
})
it('deletes the created project if getProject throws StreamNotFoundError', async () => {
const ownerId = cryptoRandomString({ length: 10 })
let storedProjectId: string | undefined = undefined
let deletedProjectId: string | undefined = undefined
const createNewProject = createNewProjectFactory({
storeProject: async ({ project }) => {
storedProjectId = project.id
},
getProject: async () => {
throw new StreamNotFoundError()
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
},
storeProjectRole: async () => {
expect.fail()
},
storeModel: async () => {
expect.fail()
},
emitEvent: async () => {
expect.fail()
}
})
const err = await expectToThrow(async () => {
await createNewProject({
ownerId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(storedProjectId).to.equal(deletedProjectId)
expect(err.message).to.equal(new RegionalProjectCreationError().message)
})
it('just throws the error from the project getter', async () => {
const ownerId = cryptoRandomString({ length: 10 })
let deletedProjectId: string | undefined = undefined
const kabumm = 'kabumm'
const createNewProject = createNewProjectFactory({
storeProject: async () => {},
getProject: async () => {
throw new Error(kabumm)
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
},
storeProjectRole: async () => {
expect.fail()
},
storeModel: async () => {
expect.fail()
},
emitEvent: async () => {
expect.fail()
}
})
const err = await expectToThrow(async () => {
await createNewProject({
ownerId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(deletedProjectId).to.be.undefined
expect(err.message).to.equal(kabumm)
})
it('continues if the project is eventually synced', async () => {
const ownerId = cryptoRandomString({ length: 10 })
let queriedProjectId: string | undefined = undefined
let storedProject: Project | undefined = undefined
let retryCount = 0
let storedProjectRole:
| {
projectId: string
@@ -206,21 +130,15 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async ({ projectId }) => {
queriedProjectId = projectId
retryCount++
if (retryCount > 3) return {} as Project
throw new StreamNotFoundError()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async (args) => {
storedProjectRole = args
},
storeModel: async (args) => {
storedModel = args
},
waitForRegionProject: async ({ projectId }) => {
queriedProjectId = projectId
},
emitEvent: async (payload) => {
if (isSpecificEventPayload(payload, ProjectEvents.Created)) {
emitedEvent = payload.eventName
@@ -277,18 +195,15 @@ describe('project services @core', () => {
storeProject: async ({ project }) => {
storedProject = project
},
getProject: async () => {
expect.fail()
},
deleteProject: async () => {
expect.fail()
},
storeProjectRole: async (args) => {
storedProjectRole = args
},
storeModel: async (args) => {
storedModel = args
},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async (payload) => {
if (isSpecificEventPayload(payload, ProjectEvents.Created)) {
emitedEvent = payload.eventName
@@ -317,4 +232,49 @@ describe('project services @core', () => {
})
})
})
describe('waitForRegionProject creates a function, that', () => {
it('deletes the created project if getProject throws StreamNotFoundError', async () => {
const storedProjectId = cryptoRandomString({ length: 10 })
let deletedProjectId: string | undefined = undefined
const waitForRegionProject = waitForRegionProjectFactory({
getProject: async () => {
throw new StreamNotFoundError()
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
}
})
const err = await expectToThrow(async () => {
await waitForRegionProject({
projectId: storedProjectId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(storedProjectId).to.equal(deletedProjectId)
expect(err.message).to.equal(new RegionalProjectCreationError().message)
})
it('just throws the error from the project getter', async () => {
const projectId = cryptoRandomString({ length: 10 })
let deletedProjectId: string | undefined = undefined
const kabumm = 'kabumm'
const waitForRegionProject = waitForRegionProjectFactory({
getProject: async () => {
throw new Error(kabumm)
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
}
})
const err = await expectToThrow(async () => {
await waitForRegionProject({
projectId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(deletedProjectId).to.be.undefined
expect(err.message).to.equal(kabumm)
})
})
})
@@ -60,7 +60,10 @@ import {
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import { downloadCommitFactory } from '@/modules/cross-server-sync/services/commit'
import { ensureOnboardingProjectFactory } from '@/modules/cross-server-sync/services/onboardingProject'
import { downloadProjectFactory } from '@/modules/cross-server-sync/services/project'
@@ -144,10 +147,12 @@ const crossServerSyncModule: SpeckleModule = {
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db }),
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db }),
storeModel: storeModelFactory({ db }),
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db })
}),
emitEvent: getEventBus().emit
})
@@ -18,7 +18,12 @@ import {
validateProjectRegionCopyFactory
} from '@/modules/workspaces/services/projectRegions'
import { db } from '@/db/knex'
import { getProjectFactory } from '@/modules/core/repositories/projects'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRolesFactory
} from '@/modules/core/repositories/projects'
import { getAvailableRegionsFactory } from '@/modules/workspaces/services/regions'
import { getRegionsFactory } from '@/modules/multiregion/repositories'
import { canWorkspaceUseRegionsFactory } from '@/modules/gatekeeper/services/featureAuthorization'
@@ -50,6 +55,9 @@ import {
} from '@/modules/workspaces/repositories/projectRegions'
import { withTransaction } from '@/modules/shared/helpers/dbHelper'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import { waitForRegionProjectFactory } from '@/modules/core/services/projects'
import { chunk } from 'lodash'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
const MULTIREGION_QUEUE_NAME = isTestEnv()
? `test:multiregion:${cryptoRandomString({ length: 5 })}`
@@ -161,7 +169,8 @@ export const startQueue = async () => {
const targetDb = await getRegionDb({ regionKey })
const targetObjectStorage = await getRegionObjectStorage({ regionKey })
return await withTransaction(
// Move project to target region
const project = await withTransaction(
async ({ db: targetDbTrx }) => {
const updateProjectRegion = updateProjectRegionFactory({
getProject: getProjectFactory({ db: sourceDb }),
@@ -220,7 +229,9 @@ export const startQueue = async () => {
countProjectWebhooks: countProjectWebhooksFactory({ db: sourceDb })
}),
updateProjectRegionKey: updateProjectRegionKeyFactory({
upsertProjectRegionKey: upsertProjectRegionKeyFactory({ db }),
upsertProjectRegionKey: upsertProjectRegionKeyFactory({
db: targetDbTrx
}),
cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({
redis: getGenericRedis()
}),
@@ -232,6 +243,39 @@ export const startQueue = async () => {
},
{ db: targetDb }
)
// Grab project roles for later reinstating
const projectRoles = await getStreamCollaboratorsFactory({ db })(project.id)
// Delete project in main db to "unblock" replication
await deleteProjectFactory({ db })({ projectId: project.id })
try {
// Wait for replication from regional db
await waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db })
})({
projectId: project.id,
regionKey,
maxAttempts: 100
})
} catch (err) {
// Failed to delete project or await replication, reset project state in main db
await storeProjectFactory({ db })({ project })
throw err
}
// Reinstate project acl records
for (const roles of chunk(projectRoles, 10_000)) {
await storeProjectRolesFactory({ db })({
roles: roles.map((role) => ({
projectId: project.id,
userId: role.id,
role: role.streamRole
}))
})
}
}
case 'delete-project-region-data':
default:
@@ -45,7 +45,11 @@ import {
isMultiRegionTestMode,
waitForRegionUser
} from '@/test/speckle-helpers/regions'
import { BasicTestStream, createTestStream } from '@/test/speckle-helpers/streamHelper'
import {
BasicTestStream,
createTestStream,
getUserStreamRole
} from '@/test/speckle-helpers/streamHelper'
import { retry, Roles } from '@speckle/shared'
import { expect } from 'chai'
import cryptoRandomString from 'crypto-random-string'
@@ -244,7 +248,7 @@ isMultiRegionTestMode()
await assertProjectRegion(testProject.id, regionKey1)
})
it('moves projects with no resources of a given type', async () => {
it('moves project with no resources of a given type', async () => {
const resA = await apollo.execute(UpdateProjectRegionDocument, {
projectId: emptyProject.id,
regionKey: regionKey2
@@ -253,6 +257,53 @@ isMultiRegionTestMode()
await ensureProjectRegion(emptyProject.id, regionKey2)
})
it('moves project to region without breaking the target region', async () => {
// Move a workspace project to region2
const resA = await apollo.execute(UpdateProjectRegionDocument, {
projectId: emptyProject.id,
regionKey: regionKey2
})
expect(resA).to.not.haveGraphQLErrors()
await ensureProjectRegion(emptyProject.id, regionKey2)
// Create a new project in region2
const testRegion2Workspace: BasicTestWorkspace = {
id: '',
ownerId: '',
name: 'My Region 2 Workspace',
slug: 'region-2-workspace'
}
await createTestWorkspace(testRegion2Workspace, adminUser, {
regionKey: regionKey2,
addPlan: {
name: 'unlimited',
status: 'valid'
}
})
const testRegion2Project: BasicTestStream = {
id: '',
ownerId: '',
name: 'My Region 2 Project',
workspaceId: testRegion2Workspace.id
}
await createTestStream(testRegion2Project, adminUser)
await ensureProjectRegion(testRegion2Project.id, regionKey2)
})
it('moves project to region and preserves project roles', async () => {
const resA = await apollo.execute(UpdateProjectRegionDocument, {
projectId: emptyProject.id,
regionKey: regionKey2
})
expect(resA).to.not.haveGraphQLErrors()
await ensureProjectRegion(emptyProject.id, regionKey2)
const role = await getUserStreamRole(adminUser.id, emptyProject.id)
if (!role || role !== Roles.Stream.Owner) {
expect.fail('Did not preserve roles on project after region move.')
}
})
it('moves project record to target regional db', async () => {
const resA = await apollo.execute(UpdateProjectRegionDocument, {
projectId: testProject.id,
@@ -83,15 +83,15 @@ export default {
workspaceId
})) {
await Promise.all(
projects.map((project) =>
scheduleJob({
projects.map(async (project) => {
await scheduleJob({
type: 'move-project-region',
payload: {
projectId: project.id,
regionKey
}
})
)
})
)
}
}
@@ -122,14 +122,15 @@ export default {
})
return await withOperationLogging(
async () =>
await scheduleJob({
async () => {
return await scheduleJob({
type: 'move-project-region',
payload: {
projectId,
regionKey
}
}),
})
},
{
logger,
operationName: 'workspaceProjectMoveToRegion',
@@ -30,7 +30,10 @@ import {
getDb,
getValidDefaultProjectRegionKey
} from '@/modules/multiregion/utils/dbSelector'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import {
deleteProjectFactory,
getProjectFactory,
@@ -367,11 +370,13 @@ export const createWorkspaceProjectFactory =
// deps not injected to ensure proper DB injection
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})