feat(multiregion): apply prepared transactions to projects (#5322)

* feat(multiregion): replace user replication

* chore(multiregion): optimise replication

* maybe it's this

* postgres is fun

* once more

* chore(multiregion): only replicate test user creation during multiregion tests

* feat: improved replicate_query logic

* fix: minor

* fix: starting issue

* feat: included user create and delete specs to multiregion

* feat: removed console logs

* fix: user defaults

* fix: multiregion test helper

* fix: update scenarios for users

* refactor(multiregion): swap replicateQuery concept to asMultiregionOperation (#5301)

feat(multiregion): introduced asMultregionOperator, refactor test to user builder classes

* chore: renamings

* fix: remove comments

* feat: remove user replication

* refactor: simplified spec usages

* chore: comments

* chore: branches and favs

* chore: more tests

* chore: more tests

* fix linting

* fix tests

* feat: dropping replication

* refactor: moved project delete to service

* fix: comment

* feat: updateStreamFactory and updateProjectFacotry

* deleteProjectFactory + replicateFactory

* deleteWorkspaceFactory

* fix: selector

* fix: tests

* fix tests, finished createStreamFactory

* feat: simplify changes

* fix: remove comment

* fix: minor strucutres

* fix: moveProjectToRegion

* fix: moved branch creation outside of multiregion scope

* fix: branch creation

* fix: tests

* fix: ci tests

* fix: removed log form test

* fix: on specs, no random regionKeys

* review fixes

* fix: mr comments

* feat: removed test

---------

Co-authored-by: Charles Driesler <chuck@speckle.systems>
This commit is contained in:
Daniel Gak Anagrov
2025-09-04 12:07:19 +01:00
committed by GitHub
parent 6692fdf4aa
commit 399c998fd7
45 changed files with 923 additions and 938 deletions
@@ -21,12 +21,16 @@ import {
saveStreamActivityFactory
} from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
import {
deleteStreamFactory,
getStreamFactory
} from '@/modules/core/repositories/streams'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import { createTestStream } from '@/test/speckle-helpers/streamHelper'
import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
import type { DeleteProjectAndCommits } from '@/modules/core/domain/projects/operations'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
const cleanup = async () => {
await truncateTables([StreamActivity.name, Users.name])
@@ -40,7 +44,22 @@ const createActivitySummary = createActivitySummaryFactory({
getActivity: geUserStreamActivityFactory({ db }),
getUser
})
const deleteStream = deleteStreamFactory({ db })
const deleteStreamAndCommits: DeleteProjectAndCommits = async ({ projectId }) =>
asMultiregionalOperation(
async ({ allDbs }) =>
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
})({ projectId }),
{
name: 'deleteStreamAndCommits spec',
logger,
dbs: await getProjectReplicationDbs({ projectId })
}
)
describe('Activity summary @activity', () => {
const userA: BasicTestUser = {
@@ -104,6 +123,7 @@ describe('Activity summary @activity', () => {
async (stream) => (await createTestStream(stream, userA)).id
)
)
await saveActivity({
streamId,
resourceType: StreamResourceTypes.Stream,
@@ -113,7 +133,7 @@ describe('Activity summary @activity', () => {
info: {},
message: 'foo'
})
await deleteStream(streamId)
await deleteStreamAndCommits({ projectId: streamId })
const summary = await createActivitySummary({
userId: userA.id,
streamIds: [streamId],
@@ -57,17 +57,11 @@ import { authorizeResolver } from '@/modules/shared'
import { Roles } from '@speckle/shared'
import { getDefaultRegionFactory } from '@/modules/workspaces/repositories/regions'
import { getDb } from '@/modules/multiregion/utils/dbSelector'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRoleFactory
} from '@/modules/core/repositories/projects'
import { storeModelFactory } from '@/modules/core/repositories/models'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
getViewerResourceGroupsFactory,
@@ -202,14 +196,11 @@ const command: CommandModule<
const getUser = getUserFactory({ db })
const createNewProject = createNewProjectFactory({
// TODO: this goes as event emmits outside (default model)
// This does not support multiregion
storeProject: storeProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db: projectDb }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})
@@ -23,8 +23,9 @@ import {
import { getUserFactory } from '@/modules/core/repositories/users'
import { cloneStreamFactory } from '@/modules/core/services/streams/clone'
import type { CommandModule } from 'yargs'
import { asOperation } from '@/modules/shared/command'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { storeProjectRoleFactory } from '@/modules/core/repositories/projects'
import { db } from '@/db/knex'
const command: CommandModule<
unknown,
@@ -48,34 +49,34 @@ const command: CommandModule<
logger.info(
`Cloning stream ${sourceStreamId} into the account of user ${targetUserId}...`
)
const { id } = await asOperation(
({ emit, db }) => {
const { id } = await asMultiregionalOperation(
({ emit, mainDb, allDbs }) => {
const cloneStream = cloneStreamFactory({
getStream: getStreamFactory({ db }),
getUser: getUserFactory({ db }),
newProjectDb: db,
sourceProjectDb: db,
createStream: createStreamFactory({ db }),
insertCommits: insertCommitsFactory({ db }),
getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db }),
insertStreamCommits: insertStreamCommitsFactory({ db }),
getBatchedStreamBranches: getBatchedStreamBranchesFactory({ db }),
insertBranches: insertBranchesFactory({ db }),
getBatchedBranchCommits: getBatchedBranchCommitsFactory({ db }),
insertBranchCommits: insertBranchCommitsFactory({ db }),
getBatchedStreamComments: getBatchedStreamCommentsFactory({ db }),
insertComments: insertCommentsFactory({ db }),
getCommentLinks: getCommentLinksFactory({ db }),
insertCommentLinks: insertCommentLinksFactory({ db }),
getStream: getStreamFactory({ db: mainDb }),
getUser: getUserFactory({ db: mainDb }),
newProjectDb: mainDb,
sourceProjectDb: mainDb,
createStream: replicateFactory(allDbs, createStreamFactory),
insertCommits: insertCommitsFactory({ db: mainDb }),
getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db: mainDb }),
insertStreamCommits: insertStreamCommitsFactory({ db: mainDb }),
getBatchedStreamBranches: getBatchedStreamBranchesFactory({ db: mainDb }),
insertBranches: insertBranchesFactory({ db: mainDb }),
getBatchedBranchCommits: getBatchedBranchCommitsFactory({ db: mainDb }),
insertBranchCommits: insertBranchCommitsFactory({ db: mainDb }),
getBatchedStreamComments: getBatchedStreamCommentsFactory({ db: mainDb }),
insertComments: insertCommentsFactory({ db: mainDb }),
getCommentLinks: getCommentLinksFactory({ db: mainDb }),
insertCommentLinks: insertCommentLinksFactory({ db: mainDb }),
emitEvent: emit,
storeProjectRole: storeProjectRoleFactory({ db })
storeProjectRole: storeProjectRoleFactory({ db: mainDb })
})
return cloneStream(targetUserId, sourceStreamId)
},
{
transaction: true,
name: 'Clone Stream',
dbs: [db], // Cloning does not support multiregion
logger
}
)
@@ -52,7 +52,6 @@ import {
storeSingleObjectIfNotFoundFactory,
getStreamObjectsFactory
} from '@/modules/core/repositories/objects'
import { legacyUpdateStreamFactory } from '@/modules/core/services/streams/management'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import {
getViewerResourcesFromLegacyIdentifiersFactory,
@@ -63,6 +62,10 @@ import { createProject } from '@/test/projectHelper'
import type { BasicTestUser } from '@/test/authHelper'
import { createTestUser } from '@/test/authHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
import type { UpdateStreamRecord } from '@/modules/core/domain/streams/operations'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db })
const streamResourceCheck = streamResourceCheckFactory({
@@ -112,9 +115,16 @@ const createCommitByBranchName = createCommitByBranchNameFactory({
getBranchById: getBranchByIdFactory({ db })
})
const updateStream = legacyUpdateStreamFactory({
updateStream: updateStreamFactory({ db })
})
const updateStream: UpdateStreamRecord = async (update) =>
asMultiregionalOperation(
async ({ allDbs }) => replicateFactory(allDbs, updateStreamFactory)(update),
{
logger,
name: 'updateStream',
dbs: await getProjectReplicationDbs({ projectId: update.id })
}
)
const grantPermissionsStream = grantStreamPermissionsFactory({ db })
const createObject = createObjectFactory({
@@ -45,6 +45,7 @@ export type GetCommit = (
export type DeleteCommits = (commitIds: string[]) => Promise<number>
export type DeleteCommit = (commitId: string) => Promise<boolean>
export type DeleteProjectCommits = (params: { projectId: string }) => Promise<void>
export type DeleteCommitAndNotify = (
commitId: string,
@@ -40,6 +40,7 @@ export type DeleteProjectRole = (args: {
}) => Promise<StreamRecord | undefined>
export type DeleteProject = (args: { projectId: string }) => Promise<void>
export type DeleteProjectAndCommits = (args: { projectId: string }) => Promise<void>
export type GetUserProjectRoles = ({
userId,
@@ -61,7 +62,6 @@ export type ProjectCreateArgs = {
}
export type CreateProject = (params: ProjectCreateArgs) => Promise<Project>
export type StoreProject = (params: { project: Project }) => Promise<void>
export type StoreModel = (params: {
@@ -152,8 +152,6 @@ export type CanUserFavoriteStream = (params: {
streamId: string
}) => Promise<boolean>
export type DeleteStreamRecord = (streamId: string) => Promise<number>
export type GetOnboardingBaseStream = (version: string) => Promise<Optional<Stream>>
export type UpdateStreamRecord = (
@@ -360,10 +358,6 @@ export type UpdateStream = (
updaterId: string
) => Promise<Stream>
export type LegacyUpdateStream = (
update: StreamUpdateInput
) => Promise<Nullable<string>>
export type PermissionUpdateInput =
| StreamUpdatePermissionInput
| StreamRevokePermissionInput
@@ -0,0 +1,36 @@
import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import type { Logger } from '@/observability/logging'
import type { DependenciesOf } from '@/modules/shared/helpers/factory'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { storeModelFactory } from '@/modules/core/repositories/models'
const onProjectCreatedFactory =
() =>
async ({ payload }: EventPayload<typeof ProjectEvents.Created>) => {
const { project, ownerId } = payload
const projectDb = await getProjectDbClient({ projectId: project.id })
const storeModel = storeModelFactory({ db: projectDb })
// Legacy flow for creating a default main branch
await storeModel({
name: 'main',
description: 'default model',
projectId: project.id,
authorId: ownerId
})
}
export const projectListenersFactory =
(
deps: { eventBus: EventBus; logger: Logger } & DependenciesOf<
typeof onProjectCreatedFactory
>
) =>
() => {
const onProjectCreated = onProjectCreatedFactory()
const cbs = [deps.eventBus.listen(ProjectEvents.Created, onProjectCreated)]
return () => cbs.forEach((cb) => cb())
}
@@ -16,21 +16,19 @@ import {
toProjectIdWhitelist
} from '@/modules/core/helpers/token'
import {
createBranchFactory,
getBatchedStreamBranchesFactory,
insertBranchesFactory
} from '@/modules/core/repositories/branches'
import {
deleteProjectCommitsFactory,
getBatchedBranchCommitsFactory,
getBatchedStreamCommitsFactory,
insertBranchCommitsFactory,
insertCommitsFactory,
insertStreamCommitsFactory
} from '@/modules/core/repositories/commits'
import { storeModelFactory } from '@/modules/core/repositories/models'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRoleFactory
} from '@/modules/core/repositories/projects'
@@ -38,7 +36,6 @@ import { getServerInfoFactory } from '@/modules/core/repositories/server'
import {
getStreamFactory,
createStreamFactory,
deleteStreamFactory,
updateStreamFactory,
revokeStreamPermissionsFactory,
grantStreamPermissionsFactory,
@@ -50,7 +47,7 @@ import {
import { getUserFactory, getUsersFactory } from '@/modules/core/repositories/users'
import {
createNewProjectFactory,
waitForRegionProjectFactory
deleteProjectAndCommitsFactory
} from '@/modules/core/services/projects'
import { throwIfRateLimitedFactory } from '@/modules/core/utils/ratelimiter'
import {
@@ -69,8 +66,9 @@ import {
import { createOnboardingStreamFactory } from '@/modules/core/services/streams/onboarding'
import { getOnboardingBaseProjectFactory } from '@/modules/cross-server-sync/services/onboardingProject'
import {
getDb,
getProjectDbClient,
getProjectReplicationDbs,
getReplicationDbs,
getValidDefaultProjectRegionKey
} from '@/modules/multiregion/utils/dbSelector'
import {
@@ -119,8 +117,9 @@ import { sendEmail } from '@/modules/emails/services/sending'
import { ProjectRecordVisibility } from '@/modules/core/helpers/types'
import { mapDbToGqlProjectVisibility } from '@/modules/core/helpers/project'
import { StreamNotFoundError } from '@/modules/core/errors/stream'
import { asOperation } from '@/modules/shared/command'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import type { Knex } from 'knex'
import type { Logger } from '@/observability/logging'
const getUser = getUserFactory({ db })
const getStream = getStreamFactory({ db })
@@ -200,6 +199,32 @@ const throwIfRateLimited = throwIfRateLimitedFactory({
rateLimiterEnabled: isRateLimiterEnabled()
})
const deleteStreamAndNotify = async (
projectId: string,
userId: string,
ctxLogger: Logger
) =>
asMultiregionalOperation(
({ allDbs, mainDb, emit }) => {
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
}),
emitEvent: emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }),
getStream: getStreamFactory({ db: mainDb })
})
return deleteStreamAndNotify(projectId, userId)
},
{
logger: ctxLogger,
name: 'delete project',
description: `Cascade deleting a project`,
dbs: await getProjectReplicationDbs({ projectId })
}
)
const resolvers: Resolvers = {
Query: {
async project(_parent, args, context) {
@@ -268,27 +293,8 @@ const resolvers: Resolvers = {
})
)
const results = await withOperationLogging(
async () =>
await Promise.all(
args.ids.map(async (id) => {
const projectDb = await getProjectDbClient({ projectId: id })
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteStream: deleteStreamFactory({
db: projectDb
}),
emitEvent: getEventBus().emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
getStream: getStreamFactory({ db: projectDb })
})
return deleteStreamAndNotify(id, ctx.userId!)
})
),
{
logger: ctx.log,
operationName: 'projectBatchDelete',
operationDescription: `Delete multiple projects`
}
const results = await Promise.all(
args.ids.map((id) => deleteStreamAndNotify(id, ctx.userId!, ctx.log))
)
return results.every((res) => res === true)
},
@@ -314,27 +320,11 @@ const resolvers: Resolvers = {
})
throwIfAuthNotOk(canDelete)
const projectDb = await getProjectDbClient({ projectId })
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteStream: deleteStreamFactory({
db: projectDb
}),
emitEvent: getEventBus().emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
getStream: getStreamFactory({ db: projectDb })
})
return await withOperationLogging(
async () => await deleteStreamAndNotify(projectId, userId!),
{
logger,
operationName: 'projectDelete',
operationDescription: `Delete a project`
}
)
return deleteStreamAndNotify(projectId, userId!, logger)
},
async createForOnboarding(_parent, _args, { userId, resourceAccessRules, log }) {
return await asOperation(
async ({ db: mainDb, emit }) => {
return await asMultiregionalOperation(
async ({ mainDb, emit, allDbs }) => {
// We want to read & write from main DB - this isn't occurring in a multi region workspace ctx
const createOnboardingStream = createOnboardingStreamFactory({
getOnboardingBaseProject: getOnboardingBaseProjectFactory({
@@ -345,7 +335,7 @@ const resolvers: Resolvers = {
getUser: getUserFactory({ db: mainDb }),
newProjectDb: mainDb,
sourceProjectDb: mainDb,
createStream: createStreamFactory({ db: mainDb }),
createStream: replicateFactory(allDbs, createStreamFactory),
insertCommits: insertCommitsFactory({ db: mainDb }),
getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db: mainDb }),
insertStreamCommits: insertStreamCommitsFactory({ db: mainDb }),
@@ -381,13 +371,12 @@ const resolvers: Resolvers = {
}),
getUsers: getUsersFactory({ db: mainDb })
}),
createStream: createStreamFactory({ db: mainDb }),
createBranch: createBranchFactory({ db: mainDb }),
createStream: replicateFactory(allDbs, createStreamFactory),
storeProjectRole: storeProjectRoleFactory({ db: mainDb }),
emitEvent: emit
}),
getUser: getUserFactory({ db: mainDb }),
updateStream: updateStreamFactory({ db: mainDb })
updateStream: replicateFactory(allDbs, updateStreamFactory)
})
return await createOnboardingStream({
@@ -399,6 +388,7 @@ const resolvers: Resolvers = {
{
logger: log,
name: 'createOnboardingProject',
dbs: [db], // Cloning does not support multiregion
description: `Create a project for onboarding`
}
)
@@ -426,18 +416,20 @@ const resolvers: Resolvers = {
})
throwIfAuthNotOk(canUpdate)
const projectDB = await getProjectDbClient({ projectId })
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: projectDB }),
updateStream: updateStreamFactory({ db: projectDB }),
emitEvent: getEventBus().emit
})
const res = await withOperationLogging(
async () => await updateStreamAndNotify(update, userId!),
const res = await asMultiregionalOperation(
async ({ mainDb, allDbs, emit }) => {
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: mainDb }),
updateStream: replicateFactory(allDbs, updateStreamFactory),
emitEvent: emit
})
return await updateStreamAndNotify(update, userId!)
},
{
logger,
operationName: 'projectUpdate',
operationDescription: `Update a project`
name: 'Update Project',
dbs: await getProjectReplicationDbs({ projectId })
}
)
@@ -465,31 +457,25 @@ const resolvers: Resolvers = {
throwIfAuthNotOk(canCreate)
const regionKey = await getValidDefaultProjectRegionKey()
const projectDb = await getDb({ regionKey })
const project = await asMultiregionalOperation(
async ({ allDbs, mainDb, emit }) => {
const createNewProject = createNewProjectFactory({
storeProject: replicateFactory(allDbs, storeProjectFactory),
storeProjectRole: storeProjectRoleFactory({ db: mainDb }),
emitEvent: emit
})
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})
const project = await withOperationLogging(
async () =>
await createNewProject({
return createNewProject({
...(args.input || {}),
ownerId: context.userId!,
regionKey
}),
})
},
{
logger,
operationName: 'projectCreate',
operationDescription: `Create a new project`
name: 'projectCreate',
dbs: await getReplicationDbs({ regionKey }),
description: `Create a new project`
}
)
@@ -13,7 +13,6 @@ import { get } from 'lodash-es'
import {
getStreamFactory,
createStreamFactory,
deleteStreamFactory,
updateStreamFactory,
revokeStreamPermissionsFactory,
grantStreamPermissionsFactory,
@@ -64,7 +63,6 @@ import { createAndSendInviteFactory } from '@/modules/serverinvites/services/cre
import { collectAndValidateCoreTargetsFactory } from '@/modules/serverinvites/services/coreResourceCollection'
import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/services/coreEmailContents'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { createBranchFactory } from '@/modules/core/repositories/branches'
import {
addOrUpdateStreamCollaboratorFactory,
isStreamCollaboratorFactory,
@@ -103,7 +101,15 @@ import { renderEmail } from '@/modules/emails/services/emailRendering'
import { sendEmail } from '@/modules/emails/services/sending'
import { ProjectRecordVisibility } from '@/modules/core/helpers/types'
import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper'
import { storeProjectRoleFactory } from '@/modules/core/repositories/projects'
import {
deleteProjectFactory,
storeProjectRoleFactory
} from '@/modules/core/repositories/projects'
import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
import type { Logger } from '@/observability/logging'
const getServerInfo = getServerInfoFactory({ db })
const getUsers = getUsersFactory({ db })
@@ -160,44 +166,32 @@ const buildFinalizeProjectInvite = () =>
getServerInfo
})
const createStreamReturnRecord = createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
findUserByTarget: findUserByTargetFactory({ db }),
insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db }),
collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({
getStream
}),
buildInviteEmailContents: buildCoreInviteEmailContentsFactory({
getStream
}),
emitEvent: ({ eventName, payload }) =>
getEventBus().emit({
eventName,
payload
const deleteStreamAndNotify = async (
projectId: string,
userId: string,
ctxLogger: Logger
) =>
asMultiregionalOperation(
({ allDbs, mainDb, emit }) => {
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
}),
getUser,
getServerInfo,
finalizeInvite: buildFinalizeProjectInvite()
}),
getUsers
}),
createStream: createStreamFactory({ db }),
createBranch: createBranchFactory({ db }),
storeProjectRole: storeProjectRoleFactory({ db }),
emitEvent: getEventBus().emit
})
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteStream: deleteStreamFactory({ db }),
emitEvent: getEventBus().emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
getStream
})
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream,
updateStream: updateStreamFactory({ db }),
emitEvent: getEventBus().emit
})
emitEvent: emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }),
getStream: getStreamFactory({ db: mainDb })
})
return deleteStreamAndNotify(projectId, userId)
},
{
logger: ctxLogger,
name: 'delete project',
description: `Cascade deleting a project`,
dbs: await getProjectReplicationDbs({ projectId })
}
)
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
const isStreamCollaborator = isStreamCollaboratorFactory({
getStream
@@ -513,17 +507,43 @@ export default {
})
throwIfAuthNotOk(canCreate)
const { id } = await withOperationLogging(
async () =>
await createStreamReturnRecord({
const { id } = await asMultiregionalOperation(
async ({ allDbs, mainDb, emit }) =>
createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
findUserByTarget: findUserByTargetFactory({ db: mainDb }),
insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({
db: mainDb
}),
collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory(
{
getStream: getStreamFactory({ db: mainDb })
}
),
buildInviteEmailContents: buildCoreInviteEmailContentsFactory({
getStream: getStreamFactory({ db: mainDb })
}),
emitEvent: emit,
getUser: getUserFactory({ db: mainDb }),
getServerInfo: getServerInfoFactory({ db: mainDb }),
finalizeInvite: buildFinalizeProjectInvite()
}),
getUsers: getUsersFactory({ db: mainDb })
}),
createStream: replicateFactory(allDbs, createStreamFactory),
storeProjectRole: storeProjectRoleFactory({ db: mainDb }),
emitEvent: emit
})({
...args.stream,
ownerId: context.userId!,
ownerResourceAccessRules: context.resourceAccessRules
}),
{
logger: context.log,
operationName: 'createStream',
operationDescription: `Create a new Stream`
name: 'createStream',
description: `Create a new Stream`,
dbs: [db] // legacy; no multiregion ctx
}
)
@@ -549,12 +569,20 @@ export default {
streamId: projectId //legacy
})
await withOperationLogging(
async () => await updateStreamAndNotify(args.stream, context.userId!),
await asMultiregionalOperation(
async ({ mainDb, allDbs, emit }) => {
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: mainDb }),
updateStream: replicateFactory(allDbs, updateStreamFactory),
emitEvent: emit
})
await updateStreamAndNotify(args.stream, context.userId!)
},
{
logger,
operationName: 'updateStream',
operationDescription: `Update a Stream`
name: 'updateStream',
dbs: await getProjectReplicationDbs({ projectId })
}
)
return true
@@ -579,43 +607,29 @@ export default {
streamId: projectId //legacy
})
return await withOperationLogging(
async () => await deleteStreamAndNotify(args.id, context.userId!),
{
logger,
operationName: 'deleteStream',
operationDescription: `Delete a Stream`
}
)
return await deleteStreamAndNotify(args.id, context.userId!, logger)
},
async streamsDelete(_, args, context) {
const logger = context.log
const results = await withOperationLogging(
async () =>
await Promise.all(
(args.ids || []).map(async (id) => {
throwIfResourceAccessNotAllowed({
resourceId: id,
resourceType: TokenResourceIdentifierType.Project,
resourceAccessRules: context.resourceAccessRules
})
const canDelete = await context.authPolicies.project.canDelete({
userId: context.userId!,
projectId: id
})
throwIfAuthNotOk(canDelete)
const results = await Promise.all(
(args.ids || []).map(async (id) => {
throwIfResourceAccessNotAllowed({
resourceId: id,
resourceType: TokenResourceIdentifierType.Project,
resourceAccessRules: context.resourceAccessRules
})
const canDelete = await context.authPolicies.project.canDelete({
userId: context.userId!,
projectId: id
})
throwIfAuthNotOk(canDelete)
return await deleteStreamAndNotify(id, context.userId!)
})
),
{
logger,
operationName: 'deleteStreams',
operationDescription: `Delete one or more Streams`
}
return await deleteStreamAndNotify(id, context.userId!, logger)
})
)
return results.every((res) => res === true)
},
@@ -32,7 +32,6 @@ import {
changeUserRoleFactory
} from '@/modules/core/services/users/management'
import {
deleteStreamFactory,
getExplicitProjects,
getUserDeletableStreamsFactory
} from '@/modules/core/repositories/streams'
@@ -47,13 +46,22 @@ import {
import { updateMailchimpMemberTags } from '@/modules/auth/services/mailchimp'
import { withOperationLogging } from '@/observability/domain/businessLogging'
import { metaHelpers } from '@/modules/core/helpers/meta'
import { asMultiregionalOperation, asOperation } from '@/modules/shared/command'
import {
asMultiregionalOperation,
asOperation,
replicateFactory
} from '@/modules/shared/command'
import { setUserOnboardingChoicesFactory } from '@/modules/core/services/users/tracking'
import { getMixpanelClient } from '@/modules/shared/utils/mixpanel'
import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper'
import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
const getUser = legacyGetUserFactory({ db })
const getUserByEmail = legacyGetUserByEmailFactory({ db })
@@ -309,7 +317,16 @@ export default {
await asMultiregionalOperation(
({ mainDb, allDbs, emit }) => {
const deleteUser = deleteUserFactory({
deleteStream: deleteStreamFactory({ db: mainDb }),
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(
allDbs,
deleteProjectCommitsFactory
)
}),
logger: dbLogger,
isLastAdminUser: isLastAdminUserFactory({ db: mainDb }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }),
@@ -359,7 +376,16 @@ export default {
await asMultiregionalOperation(
({ mainDb, allDbs, emit }) => {
const deleteUser = deleteUserFactory({
deleteStream: deleteStreamFactory({ db: mainDb }),
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(
allDbs,
deleteProjectCommitsFactory
)
}),
logger: dbLogger,
isLastAdminUser: isLastAdminUserFactory({ db: mainDb }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }),
+6
View File
@@ -45,6 +45,7 @@ import {
import { getServerTotalModelCountFactory } from '@/modules/core/services/branch/retrieval'
import { getServerTotalVersionCountFactory } from '@/modules/core/services/commit/retrieval'
import { bullMonitoringRouterFactory } from '@/modules/core/rest/monitoring'
import { projectListenersFactory } from '@/modules/core/events/projectListeners'
let stopTestSubs: (() => void) | undefined = undefined
@@ -121,6 +122,11 @@ const coreModule: SpeckleModule<{
logger: coreLogger
})
})()
projectListenersFactory({
eventBus: getEventBus(),
logger: coreLogger
})()
}
},
async finalize({ app }) {
@@ -58,7 +58,8 @@ import type {
LegacyGetPaginatedUserCommitsPage,
LegacyGetPaginatedUserCommitsTotalCount,
LegacyGetPaginatedStreamCommitsPage,
GetTotalVersionCount
GetTotalVersionCount,
DeleteProjectCommits
} from '@/modules/core/domain/commits/operations'
const tables = {
@@ -156,6 +157,21 @@ export const deleteCommitFactory =
return !!delCount
}
export const deleteProjectCommitsFactory =
(deps: { db: Knex }): DeleteProjectCommits =>
async ({ projectId }) => {
await deps.db.raw(
`
DELETE FROM commits WHERE id IN (
SELECT sc."commitId" FROM streams s
INNER JOIN stream_commits sc ON s.id = sc."streamId"
WHERE s.id = ?
)
`,
[projectId]
)
}
export const getBatchedStreamCommitsFactory =
(deps: { db: Knex }): GetBatchedStreamCommits =>
(streamId: string, options?: Partial<BatchedSelectOptions>) => {
@@ -79,7 +79,6 @@ import type {
GetStream,
GetStreamCollaborators,
GetStreams,
DeleteStreamRecord,
UpdateStreamRecord,
RevokeStreamPermissions,
GrantStreamPermissions,
@@ -890,6 +889,7 @@ export const getUserStreamsCountFactory =
const [res] = await countQuery
return parseInt(res.count)
}
export const createStreamFactory =
(deps: { db: Knex }): SaveStream =>
async (input) => {
@@ -961,23 +961,6 @@ export const getUserStreamCountsFactory =
return mapValues(keyBy(results, 'userId'), (r) => parseInt(r.count))
}
export const deleteStreamFactory =
(deps: { db: Knex }): DeleteStreamRecord =>
async (streamId: string) => {
// Delete stream commits (not automatically cascaded)
await deps.db.raw(
`
DELETE FROM commits WHERE id IN (
SELECT sc."commitId" FROM streams s
INNER JOIN stream_commits sc ON s.id = sc."streamId"
WHERE s.id = ?
)
`,
[streamId]
)
return await tables.streams(deps.db).where(Streams.col.id, streamId).del()
}
export const getStreamsSourceAppsFactory =
(deps: { db: Knex }): GetStreamsSourceApps =>
async (streamIds: string[]) => {
@@ -1295,6 +1278,7 @@ export const markOnboardingBaseStreamFactory =
if (!stream) {
throw new StreamNotFoundError(`Stream ${streamId} not found`)
}
// this happens outside of the a multiregion ctx
await updateStreamFactory(deps)({
id: streamId,
name: 'Onboarding Stream Local Source - Do Not Delete'
@@ -3,43 +3,32 @@ import { generateProjectName } from '@/modules/core/domain/projects/logic'
import type {
CreateProject,
DeleteProject,
GetProject,
DeleteProjectAndCommits,
QueryAllProjects,
StoreModel,
StoreProject,
StoreProjectRole,
WaitForRegionProject
StoreProjectRole
} from '@/modules/core/domain/projects/operations'
import type {
Project,
StreamWithOptionalRole
} from '@/modules/core/domain/streams/types'
import {
ProjectQueryError,
RegionalProjectCreationError
} from '@/modules/core/errors/projects'
import { StreamNotFoundError } from '@/modules/core/errors/stream'
import { ProjectQueryError } from '@/modules/core/errors/projects'
import { ProjectVisibility } from '@/modules/core/graph/generated/graphql'
import { mapGqlToDbProjectVisibility } from '@/modules/core/helpers/project'
import { isTestEnv } from '@/modules/shared/helpers/envHelper'
import type { EventBusEmit } from '@/modules/shared/services/eventBus'
import { retry } from '@lifeomic/attempt'
import { Roles, TIME_MS } from '@speckle/shared'
import { Roles } from '@speckle/shared'
import cryptoRandomString from 'crypto-random-string'
import type { GetExplicitProjects } from '@/modules/core/domain/streams/operations'
import type { DeleteProjectCommits } from '@/modules/core/domain/commits/operations'
export const createNewProjectFactory =
({
storeProject,
storeProjectRole,
storeModel,
waitForRegionProject,
emitEvent
}: {
storeProject: StoreProject
storeProjectRole: StoreProjectRole
storeModel: StoreModel
waitForRegionProject: WaitForRegionProject
emitEvent: EventBusEmit
}): CreateProject =>
async ({ description, name, regionKey, visibility, workspaceId, ownerId }) => {
@@ -62,20 +51,8 @@ export const createNewProjectFactory =
await storeProject({ project })
const projectId = project.id
// if regionKey, we need to make sure it is actually written and synced
if (regionKey) {
await waitForRegionProject({
projectId,
regionKey
})
}
await storeProjectRole({ projectId, userId: ownerId, role: Roles.Stream.Owner })
await storeModel({
name: 'main',
description: 'default model',
projectId,
authorId: ownerId
})
await emitEvent({
eventName: ProjectEvents.Created,
@@ -103,33 +80,6 @@ export const createNewProjectFactory =
return project
}
export const waitForRegionProjectFactory =
(deps: {
getProject: GetProject
deleteProject: DeleteProject
}): WaitForRegionProject =>
async ({ projectId, regionKey, maxAttempts = 10 }) => {
try {
await retry(
async () => {
const replicatedProject = await deps.getProject({ projectId })
if (!replicatedProject) throw new StreamNotFoundError()
},
{ maxAttempts, delay: isTestEnv() ? TIME_MS.second : undefined }
)
} catch (err) {
if (err instanceof StreamNotFoundError) {
// delete from region
await deps.deleteProject({ projectId })
throw new RegionalProjectCreationError(undefined, {
info: { projectId, regionKey }
})
}
// else throw as is
throw err
}
}
export const queryAllProjectsFactory = ({
getExplicitProjects
}: {
@@ -163,3 +113,13 @@ export const queryAllProjectsFactory = ({
iterationCount++
} while (!!currentCursor)
}
export const deleteProjectAndCommitsFactory =
(deps: {
deleteProject: DeleteProject
deleteProjectCommits: DeleteProjectCommits
}): DeleteProjectAndCommits =>
async (project) => {
await deps.deleteProjectCommits(project)
await deps.deleteProject(project)
}
@@ -23,11 +23,9 @@ import type {
AddOrUpdateStreamCollaborator,
CreateStream,
DeleteStream,
DeleteStreamRecord,
GetStream,
IsStreamCollaborator,
LegacyCreateStream,
LegacyUpdateStream,
PermissionUpdateInput,
RemoveStreamCollaborator,
SaveStream,
@@ -35,11 +33,13 @@ import type {
UpdateStreamRecord,
UpdateStreamRole
} from '@/modules/core/domain/streams/operations'
import type { StoreBranch } from '@/modules/core/domain/branches/operations'
import type { DeleteAllResourceInvites } from '@/modules/serverinvites/domain/operations'
import type { EventBusEmit } from '@/modules/shared/services/eventBus'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import type { StoreProjectRole } from '@/modules/core/domain/projects/operations'
import type {
DeleteProjectAndCommits,
StoreProjectRole
} from '@/modules/core/domain/projects/operations'
import { generateProjectName } from '@/modules/core/domain/projects/logic'
import cryptoRandomString from 'crypto-random-string'
@@ -47,7 +47,6 @@ export const createStreamReturnRecordFactory =
(deps: {
createStream: SaveStream
storeProjectRole: StoreProjectRole
createBranch: StoreBranch
inviteUsersToProject: ReturnType<typeof inviteUsersToProjectFactory>
emitEvent: EventBusEmit
}): CreateStream =>
@@ -87,14 +86,6 @@ export const createStreamReturnRecordFactory =
})
}
// Create a default main branch
await deps.createBranch({
name: 'main',
description: 'default branch',
streamId,
authorId: ownerId
})
// Invite contributors?
if (!isProjectCreateInput(params) && params.withContributors?.length) {
// TODO: should be injected in the resolver
@@ -144,7 +135,7 @@ export const legacyCreateStreamFactory =
*/
export const deleteStreamAndNotifyFactory =
(deps: {
deleteStream: DeleteStreamRecord
deleteProjectAndCommits: DeleteProjectAndCommits
deleteAllResourceInvites: DeleteAllResourceInvites
getStream: GetStream
emitEvent: EventBusEmit
@@ -178,7 +169,7 @@ export const deleteStreamAndNotifyFactory =
resourceId: streamId,
resourceType: ProjectInviteResourceType
}),
deps.deleteStream(streamId)
deps.deleteProjectAndCommits({ projectId: streamId })
])
return true
}
@@ -218,16 +209,6 @@ export const updateStreamAndNotifyFactory =
return newStream
}
/**
* @deprecated Use updateStreamAndNotifyFactory() or the repo fn directly
*/
export const legacyUpdateStreamFactory =
(deps: { updateStream: UpdateStreamRecord }): LegacyUpdateStream =>
async (update) => {
const updatedStream = await deps.updateStream(update)
return updatedStream?.id || null
}
const isProjectUpdateRoleInput = (
i: PermissionUpdateInput
): i is ProjectUpdateRoleInput => has(i, 'projectId')
@@ -39,10 +39,7 @@ import type {
FindPrimaryEmailForUser,
ValidateAndCreateUserEmail
} from '@/modules/core/domain/userEmails/operations'
import type {
DeleteStreamRecord,
GetUserDeletableStreams
} from '@/modules/core/domain/streams/operations'
import type { GetUserDeletableStreams } from '@/modules/core/domain/streams/operations'
import type { Logger } from '@/observability/logging'
import type { DeleteAllUserInvites } from '@/modules/serverinvites/domain/operations'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
@@ -52,7 +49,10 @@ import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import type { GetUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/domain/operations'
import { WorkspaceEvents } from '@/modules/workspacesCore/domain/events'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import type { QueryAllProjects } from '@/modules/core/domain/projects/operations'
import type {
DeleteProjectAndCommits,
QueryAllProjects
} from '@/modules/core/domain/projects/operations'
import type { StreamWithOptionalRole } from '@/modules/core/repositories/streams'
import { v4 } from 'uuid'
@@ -293,7 +293,7 @@ export const findOrCreateUserFactory =
export const deleteUserFactory =
(deps: {
deleteStream: DeleteStreamRecord
deleteProjectAndCommits: DeleteProjectAndCommits
logger: Logger
isLastAdminUser: IsLastAdminUser
getUserDeletableStreams: GetUserDeletableStreams
@@ -312,7 +312,7 @@ export const deleteUserFactory =
const streamIds = await deps.getUserDeletableStreams(id)
for (const id of streamIds) {
await deps.deleteStream(id)
await deps.deleteProjectAndCommits({ projectId: id })
}
// Delete all invites (they don't have a FK, so we need to do this manually)
@@ -17,7 +17,7 @@ export const buildTestProject = (overrides?: Partial<Project>): Project =>
updatedAt: new Date(),
allowPublicComments: false,
workspaceId: cryptoRandomString({ length: 10 }),
regionKey: cryptoRandomString({ length: 4 }),
regionKey: null,
visibility: ProjectRecordVisibility.Private
},
overrides
@@ -14,6 +14,10 @@ import { Roles } from '@speckle/shared'
import { expect } from 'chai'
import cryptoRandomString from 'crypto-random-string'
import { assign } from 'lodash-es'
import type { DeleteProject } from '@/modules/core/domain/projects/operations'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
const createTestProject = (overrides?: Partial<Project>): Project => {
const defaults: Project = {
@@ -33,7 +37,16 @@ const createTestProject = (overrides?: Partial<Project>): Project => {
const storeProject = storeProjectFactory({ db })
const getProject = getProjectFactory({ db })
const deleteProject = deleteProjectFactory({ db })
const deleteProject: DeleteProject = async ({ projectId }) =>
asMultiregionalOperation(
async ({ allDbs }) =>
await replicateFactory(allDbs, deleteProjectFactory)({ projectId }),
{
name: 'delete spec',
logger,
dbs: await getProjectReplicationDbs({ projectId })
}
)
const storeProjectRole = storeProjectRoleFactory({ db })
describe('project repositories @core', () => {
@@ -77,9 +90,6 @@ describe('project repositories @core', () => {
})
})
describe('deleteProjectFactory creates a function, that', () => {
it('does nothing if project does not exist', async () => {
await deleteProject({ projectId: cryptoRandomString({ length: 10 }) })
})
it('deletes the project', async () => {
const project = createTestProject()
await storeProject({ project })
@@ -10,6 +10,7 @@ import {
} from '@/modules/core/repositories/branches'
import {
deleteCommitsFactory,
deleteProjectCommitsFactory,
getCommitBranchFactory,
getCommitFactory,
getCommitsFactory,
@@ -17,7 +18,6 @@ import {
updateCommitFactory
} from '@/modules/core/repositories/commits'
import {
deleteStreamFactory,
getCommitStreamFactory,
getStreamFactory,
getStreamRolesFactory,
@@ -45,7 +45,10 @@ import {
deleteStreamAndNotifyFactory,
updateStreamAndNotifyFactory
} from '@/modules/core/services/streams/management'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
getProjectDbClient,
getProjectReplicationDbs
} from '@/modules/multiregion/utils/dbSelector'
import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
@@ -97,18 +100,25 @@ import { faker } from '@faker-js/faker'
import type { Optional, ServerScope } from '@speckle/shared'
import { Roles, Scopes, WorkspacePlans } from '@speckle/shared'
import { expect } from 'chai'
import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import type { UpdateStream } from '@/modules/core/domain/streams/operations'
import { logger } from '@/observability/logging'
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
const isStreamCollaborator = isStreamCollaboratorFactory({
getStream: getStreamFactory({ db })
})
// should be wrapped in a multiregion operator
const buildDeleteProject = async (params: { projectId: string; ownerId: string }) => {
const { projectId, ownerId } = params
const projectDb = await getProjectDbClient({ projectId })
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteStream: deleteStreamFactory({
db: projectDb
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: deleteProjectFactory({ db: projectDb }),
deleteProjectCommits: deleteProjectCommitsFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
@@ -117,15 +127,23 @@ const buildDeleteProject = async (params: { projectId: string; ownerId: string }
return async () => deleteStreamAndNotify(projectId, ownerId)
}
const buildUpdateProject = async (params: { projectId: string }) => {
const { projectId } = params
const projectDB = await getProjectDbClient({ projectId })
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: projectDB }),
updateStream: updateStreamFactory({ db: projectDB }),
emitEvent: getEventBus().emit
})
return updateStreamAndNotify
const updateProject: UpdateStream = async (stream, me) => {
return asMultiregionalOperation(
async ({ mainDb, allDbs, emit }) => {
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: mainDb }),
updateStream: replicateFactory(allDbs, updateStreamFactory),
emitEvent: emit
})
return updateStreamAndNotify(stream, me)
},
{
logger,
name: 'updateStream spec',
dbs: await getProjectReplicationDbs({ projectId: stream.id })
}
)
}
const buildUpdateModel = async (params: { projectId: string }) => {
@@ -284,7 +302,6 @@ describe('Core GraphQL Subscriptions (New)', () => {
const triggerProjectUpdate = async () => {
const projectId = randomProject.id
const updateProject = await buildUpdateProject({ projectId })
await updateProject({ id: projectId, name: new Date().toISOString() }, me.id)
}
@@ -589,7 +606,6 @@ describe('Core GraphQL Subscriptions (New)', () => {
workspaceId: myMainWorkspace.id
}
await createTestStreams([[myProj, me]])
const updateProject = await buildUpdateProject({ projectId: myProj.id })
const onUserProjectsUpdated = await meSubClient.subscribe(
OnProjectUpdatedDocument,
@@ -631,7 +647,6 @@ describe('Core GraphQL Subscriptions (New)', () => {
workspaceId: myMainWorkspace.id
}
await createTestStreams([[myProj, me]])
const updateProject = await buildUpdateProject({ projectId: myProj.id })
const onUserProjectsUpdated = await meSubClient.subscribe(
OnProjectUpdatedDocument,
@@ -10,13 +10,9 @@ import {
import type { BasicTestUser } from '@/test/authHelper'
import { createTestUsers } from '@/test/authHelper'
import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper'
import {
createTestStream,
createTestStreams
} from '@/test/speckle-helpers/streamHelper'
import { createTestStream } from '@/test/speckle-helpers/streamHelper'
import type { StreamWithOptionalRole } from '@/modules/core/repositories/streams'
import {
deleteStreamFactory,
getStreamFactory,
getStreamRolesFactory,
getStreamsCollaboratorsFactory,
@@ -55,6 +51,7 @@ import {
} from '@/modules/core/services/commit/management'
import {
createCommitFactory,
deleteProjectCommitsFactory,
insertBranchCommitsFactory,
insertStreamCommitsFactory
} from '@/modules/core/repositories/commits'
@@ -82,6 +79,12 @@ import {
import { changeUserRoleFactory } from '@/modules/core/services/users/management'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
import type { UpdateStream } from '@/modules/core/domain/streams/operations'
const getServerInfo = getServerInfoFactory({ db })
const getUser = getUserFactory({ db })
@@ -115,18 +118,46 @@ const createCommitByBranchName = createCommitByBranchNameFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
getBranchById: getBranchByIdFactory({ db })
})
const deleteStream = deleteStreamAndNotifyFactory({
deleteStream: deleteStreamFactory({ db }),
getStream,
emitEvent: getEventBus().emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db })
})
const updateStream = updateStreamAndNotifyFactory({
getStream,
updateStream: updateStreamFactory({ db }),
emitEvent: getEventBus().emit
})
const deleteStream = async (projectId: string, userId: string) =>
asMultiregionalOperation(
({ allDbs, mainDb, emit }) => {
const deleteStreamAndNotify = deleteStreamAndNotifyFactory({
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
}),
emitEvent: emit,
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }),
getStream: getStreamFactory({ db: mainDb })
})
return deleteStreamAndNotify(projectId, userId)
},
{
logger,
name: 'delete project spec',
description: `Cascade deleting a project in all regions`,
dbs: await getProjectReplicationDbs({ projectId })
}
)
const updateStream: UpdateStream = async (stream, userId) =>
asMultiregionalOperation(
async ({ mainDb, allDbs, emit }) => {
const updateStreamAndNotify = updateStreamAndNotifyFactory({
getStream: getStreamFactory({ db: mainDb }),
updateStream: replicateFactory(allDbs, updateStreamFactory),
emitEvent: emit
})
return updateStreamAndNotify(stream, userId)
},
{
logger,
name: 'updateStream',
dbs: await getProjectReplicationDbs({ projectId: stream.id })
}
)
const revokeStreamPermissions = revokeStreamPermissionsFactory({ db })
const validateStreamAccess = validateStreamAccessFactory({
@@ -163,21 +194,8 @@ describe('Streams @core-streams', () => {
id: ''
}
const testStream: BasicTestStream = {
name: 'Test Stream 01',
description: 'wonderful test stream',
isPublic: true,
ownerId: '',
id: ''
}
const secondTestStream: BasicTestStream = {
name: 'Test Stream 02',
description: 'wot',
isPublic: false,
ownerId: '',
id: ''
}
let testStream: BasicTestStream
let secondTestStream: BasicTestStream
let quitters: (() => void)[] = []
@@ -190,10 +208,22 @@ describe('Streams @core-streams', () => {
await beforeEachContext()
await createTestUsers([userOne, userTwo])
await createTestStreams([
[testStream, userOne],
[secondTestStream, userOne]
])
testStream = await createTestStream(
{
name: 'Test Stream 01',
description: 'wonderful test stream',
isPublic: true
},
userOne
)
secondTestStream = await createTestStream(
{
name: 'Test Stream 02',
description: 'wot',
isPublic: false
},
userOne
)
})
afterEach(() => {
@@ -1,14 +1,8 @@
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import type { Project } from '@/modules/core/domain/streams/types'
import { RegionalProjectCreationError } from '@/modules/core/errors/projects'
import { StreamNotFoundError } from '@/modules/core/errors/stream'
import { ProjectRecordVisibility } from '@/modules/core/helpers/types'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import { isSpecificEventPayload } from '@/modules/shared/services/eventBus'
import { expectToThrow } from '@/test/assertionHelper'
import type { StreamRoles } from '@speckle/shared'
import { Roles } from '@speckle/shared'
import { expect } from 'chai'
@@ -24,10 +18,6 @@ describe('project services @core', () => {
storedProject = project
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
const project = await createNewProject({ ownerId })
@@ -47,10 +37,6 @@ describe('project services @core', () => {
storedProject = project
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
@@ -71,10 +57,6 @@ describe('project services @core', () => {
storedProject = project
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
@@ -93,10 +75,6 @@ describe('project services @core', () => {
storedProject = project
},
storeProjectRole: async () => {},
storeModel: async () => {},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async () => {}
})
const project = await createNewProject({ ownerId, visibility: 'PRIVATE' })
@@ -105,72 +83,6 @@ describe('project services @core', () => {
expect(storedProject!.visibility).to.eq(ProjectRecordVisibility.Private)
expect(storedProject!.allowPublicComments).to.be.false
})
it('continues if the project is eventually synced', async () => {
const ownerId = cryptoRandomString({ length: 10 })
let queriedProjectId: string | undefined = undefined
let storedProject: Project | undefined = undefined
let storedProjectRole:
| {
projectId: string
userId: string
role: StreamRoles
}
| undefined = undefined
let storedModel:
| {
name: string
description: string | null
projectId: string
authorId: string
}
| undefined = undefined
let emitedEvent: string | undefined = undefined
let eventPayload: { project: Project; ownerId: string } | undefined = undefined
const createNewProject = createNewProjectFactory({
storeProject: async ({ project }) => {
storedProject = project
},
storeProjectRole: async (args) => {
storedProjectRole = args
},
storeModel: async (args) => {
storedModel = args
},
waitForRegionProject: async ({ projectId }) => {
queriedProjectId = projectId
},
emitEvent: async (payload) => {
if (isSpecificEventPayload(payload, ProjectEvents.Created)) {
emitedEvent = payload.eventName
eventPayload = payload.payload
}
}
})
const project = await createNewProject({
ownerId,
regionKey: cryptoRandomString({ length: 10 })
})
expect(storedProject!.id).to.equal(queriedProjectId)
expect(project).deep.equal(storedProject)
expect(storedProjectRole).deep.equal({
projectId: project.id,
userId: ownerId,
role: Roles.Stream.Owner
})
expect(storedModel).deep.equal({
name: 'main',
description: 'default model',
projectId: project.id,
authorId: ownerId
})
expect(emitedEvent).to.equal(ProjectEvents.Created)
expect(eventPayload).deep.equal({
ownerId,
project,
input: { description: '', name: project.name, visibility: 'PRIVATE' }
})
})
it('successfully creates a project', async () => {
const ownerId = cryptoRandomString({ length: 10 })
@@ -182,14 +94,6 @@ describe('project services @core', () => {
role: StreamRoles
}
| undefined = undefined
let storedModel:
| {
name: string
description: string | null
projectId: string
authorId: string
}
| undefined = undefined
let emitedEvent: string | undefined = undefined
let eventPayload: { project: Project; ownerId: string } | undefined = undefined
const createNewProject = createNewProjectFactory({
@@ -199,12 +103,6 @@ describe('project services @core', () => {
storeProjectRole: async (args) => {
storedProjectRole = args
},
storeModel: async (args) => {
storedModel = args
},
waitForRegionProject: async () => {
expect.fail()
},
emitEvent: async (payload) => {
if (isSpecificEventPayload(payload, ProjectEvents.Created)) {
emitedEvent = payload.eventName
@@ -219,12 +117,6 @@ describe('project services @core', () => {
userId: ownerId,
role: Roles.Stream.Owner
})
expect(storedModel).deep.equal({
name: 'main',
description: 'default model',
projectId: project.id,
authorId: ownerId
})
expect(emitedEvent).to.equal(ProjectEvents.Created)
expect(eventPayload).deep.equal({
ownerId,
@@ -233,49 +125,4 @@ describe('project services @core', () => {
})
})
})
describe('waitForRegionProject creates a function, that', () => {
it('deletes the created project if getProject throws StreamNotFoundError', async () => {
const storedProjectId = cryptoRandomString({ length: 10 })
let deletedProjectId: string | undefined = undefined
const waitForRegionProject = waitForRegionProjectFactory({
getProject: async () => {
throw new StreamNotFoundError()
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
}
})
const err = await expectToThrow(async () => {
await waitForRegionProject({
projectId: storedProjectId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(storedProjectId).to.equal(deletedProjectId)
expect(err.message).to.equal(new RegionalProjectCreationError().message)
})
it('just throws the error from the project getter', async () => {
const projectId = cryptoRandomString({ length: 10 })
let deletedProjectId: string | undefined = undefined
const kabumm = 'kabumm'
const waitForRegionProject = waitForRegionProjectFactory({
getProject: async () => {
throw new Error(kabumm)
},
deleteProject: async ({ projectId }) => {
deletedProjectId = projectId
}
})
const err = await expectToThrow(async () => {
await waitForRegionProject({
projectId,
regionKey: cryptoRandomString({ length: 10 })
})
})
expect(deletedProjectId).to.be.undefined
expect(err.message).to.equal(kabumm)
})
})
})
@@ -25,7 +25,8 @@ import {
insertStreamCommitsFactory,
insertBranchCommitsFactory,
legacyGetPaginatedStreamCommitsPageFactory,
getPaginatedBranchCommitsItemsFactory
getPaginatedBranchCommitsItemsFactory,
deleteProjectCommitsFactory
} from '@/modules/core/repositories/commits'
import {
createCommitByBranchIdFactory,
@@ -35,7 +36,6 @@ import {
getStreamFactory,
grantStreamPermissionsFactory,
markCommitStreamUpdatedFactory,
deleteStreamFactory,
getUserDeletableStreamsFactory,
getExplicitProjects
} from '@/modules/core/repositories/streams'
@@ -104,9 +104,12 @@ import { getPaginatedBranchCommitsItemsByNameFactory } from '@/modules/core/serv
import { getPaginatedStreamBranchesFactory } from '@/modules/core/services/branch/retrieval'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import { getTestRegionClients } from '@/modules/multiregion/tests/helpers'
import { asMultiregionalOperation } from '@/modules/shared/command'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import type {
ChangeUserPassword,
CreateValidatedUser,
@@ -114,6 +117,7 @@ import type {
UpdateUserAndNotify
} from '@/modules/core/domain/users/operations'
import { createTestStream } from '@/test/speckle-helpers/streamHelper'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
const getServerInfo = getServerInfoFactory({ db })
const getUser = legacyGetUserFactory({ db })
@@ -252,7 +256,13 @@ const deleteUser: DeleteUser = async (...input) =>
asMultiregionalOperation(
({ mainDb, allDbs, emit }) => {
const deleteUser = deleteUserFactory({
deleteStream: deleteStreamFactory({ db: mainDb }),
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
}),
logger: dbLogger,
isLastAdminUser: isLastAdminUserFactory({ db: mainDb }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }),
@@ -37,7 +37,6 @@ import {
deleteAllUserInvitesFactory
} from '@/modules/serverinvites/repositories/serverInvites'
import {
deleteStreamFactory,
getExplicitProjects,
getUserDeletableStreamsFactory
} from '@/modules/core/repositories/streams'
@@ -46,9 +45,17 @@ import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { expect } from 'chai'
import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import type { BasicTestUser } from '@/test/authHelper'
import { createTestUser } from '@/test/authHelper'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import type { DeleteUser } from '@/modules/core/domain/users/operations'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { getTestRegionClients } from '@/modules/multiregion/tests/helpers'
const getUsers = legacyGetPaginatedUsersFactory({ db })
const countUsers = legacyGetPaginatedUsersCountFactory({ db })
@@ -82,19 +89,45 @@ const createUser = createUserFactory({
}),
emitEvent: getEventBus().emit
})
const deleteUser = deleteUserFactory({
deleteStream: deleteStreamFactory({ db }),
logger: dbLogger,
isLastAdminUser: isLastAdminUserFactory({ db }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db }),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db })
}),
getUserWorkspaceSeats: getUserWorkspaceSeatsFactory({ db }),
deleteAllUserInvites: deleteAllUserInvitesFactory({ db }),
deleteUserRecord: deleteUserRecordFactory({ db }),
emitEvent: getEventBus().emit
})
const deleteUser: DeleteUser = async (...input) =>
asMultiregionalOperation(
({ mainDb, allDbs, emit }) => {
const deleteUser = deleteUserFactory({
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory)
}),
logger: dbLogger,
isLastAdminUser: isLastAdminUserFactory({ db: mainDb }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db: mainDb })
}),
getUserWorkspaceSeats: getUserWorkspaceSeatsFactory({ db: mainDb }),
deleteAllUserInvites: deleteAllUserInvitesFactory({ db: mainDb }),
deleteUserRecord: async (params) => {
const [res] = await Promise.all(
allDbs.map((db) => deleteUserRecordFactory({ db })(params))
)
return res
},
emitEvent: emit
})
return deleteUser(...input)
},
{
logger: dbLogger,
name: 'delete user spec',
dbs: await getTestRegionClients()
}
)
const getUserRole = getUserRoleFactory({ db })
const buildChangeUserRole = (guestModeEnabled = false) =>
changeUserRoleFactory({
@@ -31,15 +31,12 @@ import {
insertBranchCommitsFactory,
insertStreamCommitsFactory
} from '@/modules/core/repositories/commits'
import { storeModelFactory } from '@/modules/core/repositories/models'
import {
getObjectFactory,
getStreamObjectsFactory,
storeSingleObjectIfNotFoundFactory
} from '@/modules/core/repositories/objects'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRoleFactory
} from '@/modules/core/repositories/projects'
@@ -59,10 +56,7 @@ import {
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import { downloadCommitFactory } from '@/modules/cross-server-sync/services/commit'
import { ensureOnboardingProjectFactory } from '@/modules/cross-server-sync/services/onboardingProject'
import { downloadProjectFactory } from '@/modules/cross-server-sync/services/project'
@@ -156,13 +150,9 @@ const crossServerSyncModule: SpeckleModule = {
})
const createNewProject = createNewProjectFactory({
// This happens always outside of multiregion ctx
storeProject: storeProjectFactory({ db }),
storeModel: storeModelFactory({ db }),
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db })
}),
emitEvent: getEventBus().emit
})
@@ -88,21 +88,21 @@ export const updateProjectRegionKeyFactory =
}
export type GetRegionDb = (args: { regionKey: string }) => Promise<Knex>
type GetDefaultDb = () => Knex
export type GetProjectDb = (args: { projectId: string }) => Promise<Knex>
export type GetProjectDb<T extends Knex | undefined = Knex> = (args: {
projectId: string
}) => T | Promise<T>
export const getProjectDbClientFactory =
({
<T extends Knex | undefined>({
getProjectRegionKey,
getDefaultDb,
getRegionDb
}: {
getProjectRegionKey: GetProjectRegionKey
getDefaultDb: GetDefaultDb
getDefaultDb: () => T
getRegionDb: GetRegionDb
}): GetProjectDb =>
}): GetProjectDb<T> =>
async ({ projectId }) => {
const regionKey = await getProjectRegionKey({ projectId })
if (!regionKey) return getDefaultDb()
return getRegionDb({ regionKey })
return getRegionDb({ regionKey }) as Promise<T>
}
@@ -9,20 +9,22 @@ import {
MultiRegionInvalidJobError,
MultiRegionNotYetImplementedError
} from '@/modules/multiregion/errors'
import { getProjectDbClient, getRegionDb } from '@/modules/multiregion/utils/dbSelector'
import {
getProjectDbClient,
getRegionDb,
getReplicationDbs
} from '@/modules/multiregion/utils/dbSelector'
import {
getProjectObjectStorage,
getRegionObjectStorage
} from '@/modules/multiregion/utils/blobStorageSelector'
import {
updateProjectRegionFactory,
moveProjectToRegionFactory,
validateProjectRegionCopyFactory
} from '@/modules/workspaces/services/projectRegions'
import { db } from '@/db/knex'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRolesFactory
} from '@/modules/core/repositories/projects'
import { getAvailableRegionsFactory } from '@/modules/workspaces/services/regions'
@@ -36,7 +38,6 @@ import {
import { updateProjectRegionKeyFactory } from '@/modules/multiregion/services/projectRegion'
import { getGenericRedis } from '@/modules/shared/redis/redis'
import { initializeQueue as setupQueue } from '@speckle/shared/queue'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
copyWorkspaceFactory,
copyProjectsFactory,
@@ -56,9 +57,9 @@ import {
} from '@/modules/workspaces/repositories/projectRegions'
import { withTransaction } from '@/modules/shared/helpers/dbHelper'
import { getRedisUrl } from '@/modules/shared/helpers/envHelper'
import { waitForRegionProjectFactory } from '@/modules/core/services/projects'
import { chunk } from 'lodash-es'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
const MULTIREGION_QUEUE_NAME = isTestEnv()
? `test:multiregion:${cryptoRandomString({ length: 5 })}`
@@ -173,9 +174,9 @@ export const startQueue = async () => {
.private
// Move project to target region
const project = await withTransaction(
await withTransaction(
async ({ db: targetDbTrx }) => {
const updateProjectRegion = updateProjectRegionFactory({
const moveProjectToRegion = moveProjectToRegionFactory({
getProject: getProjectFactory({ db: sourceDb }),
getAvailableRegions: getAvailableRegionsFactory({
getRegions: getRegionsFactory({ db }),
@@ -230,45 +231,40 @@ export const startQueue = async () => {
}),
countProjectComments: countProjectCommentsFactory({ db: sourceDb }),
countProjectWebhooks: countProjectWebhooksFactory({ db: sourceDb })
}),
updateProjectRegionKey: updateProjectRegionKeyFactory({
upsertProjectRegionKey: upsertProjectRegionKeyFactory({
db: targetDbTrx
}),
cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({
redis: getGenericRedis()
}),
emitEvent: getEventBus().emit
})
})
return updateProjectRegion({ projectId, regionKey })
await moveProjectToRegion({ projectId, regionKey })
},
{ db: targetDb }
{
db: targetDb
}
)
// Update project region in dbs and update relevant caches
const project = await asMultiregionalOperation(
async ({ allDbs, emit }) =>
updateProjectRegionKeyFactory({
upsertProjectRegionKey: replicateFactory(
allDbs,
upsertProjectRegionKeyFactory
),
cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({
redis: getGenericRedis()
}),
emitEvent: emit
})({ projectId, regionKey }),
{
name: 'updateProjectRegion',
description: 'Update project region in db and update relevant caches',
logger,
dbs: await getReplicationDbs({ regionKey })
}
)
// Grab project roles for later reinstating
const projectRoles = await getStreamCollaboratorsFactory({ db })(project.id)
// Delete project in main db to "unblock" replication
await deleteProjectFactory({ db })({ projectId: project.id })
try {
// Wait for replication from regional db
await waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: targetDb })
})({
projectId: project.id,
regionKey,
maxAttempts: 100
})
} catch (err) {
// Failed to delete project or await replication, reset project state in main db
await storeProjectFactory({ db })({ project })
throw err
}
// Reinstate project acl records
for (const roles of chunk(projectRoles, 10_000)) {
await storeProjectRolesFactory({ db })({
@@ -1,7 +1,6 @@
import { mainDb } from '@/db/knex'
import { getMainObjectStorage } from '@/modules/blobstorage/clients/objectStorage'
import type { DataRegionsConfig } from '@/modules/multiregion/domain/types'
import { isMultiRegionEnabled } from '@/modules/multiregion/helpers'
import {
getMultiRegionConfig,
setMultiRegionConfig
@@ -21,8 +20,10 @@ import {
import type { ExecuteOperationOptions, TestApolloServer } from '@/test/graphqlHelper'
import { testApolloServer } from '@/test/graphqlHelper'
import { beforeEachContext, getRegionKeys } from '@/test/hooks'
import { truncateRegionsSafely } from '@/test/speckle-helpers/regions'
import {
isMultiRegionTestMode,
truncateRegionsSafely
} from '@/test/speckle-helpers/regions'
import { Roles } from '@speckle/shared'
import type { MultiRegionConfig } from '@speckle/shared/environment/db'
import { getConnectionSettings } from '@speckle/shared/environment/db'
@@ -30,7 +31,7 @@ import { expect } from 'chai'
import { merge } from 'lodash-es'
import { resetRegisteredRegions } from '@/modules/multiregion/utils/dbSelector'
const isEnabled = isMultiRegionEnabled()
const isEnabled = isMultiRegionTestMode()
isEnabled
? describe('Multi Region Server Settings @multiregion', () => {
@@ -1,5 +1,8 @@
import { db } from '@/db/knex'
import { getRegisteredRegionClients } from '@/modules/multiregion/utils/dbSelector'
import {
getRegisteredRegionClients,
getReplicationDbs
} from '@/modules/multiregion/utils/dbSelector'
import { isMultiRegionTestMode } from '@/test/speckle-helpers/regions'
import type { Knex } from 'knex'
@@ -10,3 +13,19 @@ export async function getTestRegionClients(): Promise<[Knex, ...Knex[]]> {
const regionDbs = Object.values(regionClients)
return [db, ...regionDbs]
}
export async function getTestRegionClientsForProject({
regionKey
}: {
regionKey?: string
}): Promise<[Knex, ...Knex[]]> {
if (!isMultiRegionTestMode()) return [db]
if (!regionKey) return [db]
const regionClients = await getRegisteredRegionClients()
const regionDb = regionClients[regionKey]
if (!regionDb) return [db]
return await getReplicationDbs({ regionKey })
}
@@ -1,4 +1,4 @@
import { db } from '@/db/knex'
import { db, mainDb } from '@/db/knex'
import type {
GetProjectDb,
GetRegionDb
@@ -7,7 +7,6 @@ import { getProjectDbClientFactory } from '@/modules/multiregion/services/projec
import type { Knex } from 'knex'
import { getRegionFactory } from '@/modules/multiregion/repositories'
import {
DatabaseError,
LogicError,
MisconfiguredEnvironmentError,
TestOnlyLogicError
@@ -20,14 +19,14 @@ import {
getMainRegionConfig
} from '@/modules/multiregion/regionConfig'
import type { MaybeNullOrUndefined } from '@speckle/shared'
import { ensureError, TIME_MS, wait } from '@speckle/shared'
import { TIME_MS, wait } from '@speckle/shared'
import { isTestEnv } from '@/modules/shared/helpers/envHelper'
import { migrateDbToLatest } from '@/db/migrations'
import {
getProjectRegionKey,
getRegisteredRegionConfigs
} from '@/modules/multiregion/utils/regionSelector'
import { get, mapValues } from 'lodash-es'
import { mapValues } from 'lodash-es'
import { isMultiRegionEnabled } from '@/modules/multiregion/helpers'
import { logger } from '@/observability/logging'
@@ -77,11 +76,42 @@ const initializeDbGetter = async (): Promise<GetProjectDb> => {
}
// this guy is the star of the show here
// returns where the project is located
export const getProjectDbClient: GetProjectDb = async ({ projectId }) => {
if (!getter) getter = await initializeDbGetter()
return await getter({ projectId })
}
// helper for replication logic
// returns the replication strategy ( locations where data need to be updated at the same time)
// instead of just the target db
export const getProjectReplicationDbs = async ({
projectId
}: {
projectId: string
}): Promise<[Knex, ...Knex[]]> => {
const getDefaultDb = () => undefined
const projectDb = await getProjectDbClientFactory({
getDefaultDb,
getRegionDb,
getProjectRegionKey
})({ projectId })
return [mainDb, ...(projectDb ? [projectDb] : [])]
}
export const getReplicationDbs = async ({
regionKey
}: {
regionKey: string | null
}): Promise<[Knex, ...Knex[]]> => {
if (!regionKey) {
return [mainDb]
}
return [mainDb, await getRegionDb({ regionKey })]
}
// the default region key is a config value, we're caching this globally
let defaultRegionKeyCache: string | null | undefined = undefined
@@ -102,6 +132,7 @@ export const getValidDefaultProjectRegionKey = async (): Promise<string | null>
type RegionClients = Record<string, Knex>
let registeredRegionClients: RegionClients | undefined = undefined
export type DatabaseClient = { client: Knex; isMain: boolean; regionKey: string }
/**
* Idempotently initialize registered region (in db) Knex clients
@@ -135,9 +166,7 @@ export const getRegisteredRegionClients = async (): Promise<RegionClients> => {
export const getRegisteredDbClients = async (): Promise<Knex[]> =>
Object.values(await getRegisteredRegionClients())
export const getAllRegisteredDbClients = async (): Promise<
Array<{ client: Knex; isMain: boolean; regionKey: string }>
> => {
export const getAllRegisteredDbClients = async (): Promise<Array<DatabaseClient>> => {
const mainDb = db
const regionDbs: RegionClients = isMultiRegionEnabled()
? await getRegisteredRegionClients()
@@ -190,18 +219,22 @@ export const initializeRegion: InitializeRegion = async ({ regionKey }) => {
? 'require'
: 'disable'
await dropUserReplicationIfExists({
await dropReplicationIfExists({
from: mainDb,
to: regionDb,
regionName: regionKey,
sslmode
sslmode,
subName: createPubSubName(`userssub_${regionKey}`),
pubName: createPubSubName('userspub')
})
await setUpProjectReplication({
await dropReplicationIfExists({
from: regionDb,
to: mainDb,
regionName: regionKey,
sslmode
sslmode,
subName: createPubSubName(`projectsub_${regionKey}`),
pubName: createPubSubName('projectpub')
})
}
@@ -219,14 +252,13 @@ interface ReplicationArgs {
regionName: string
}
const dropUserReplicationIfExists = async ({
const dropReplicationIfExists = async ({
from,
to,
regionName
}: ReplicationArgs): Promise<void> => {
const subName = createPubSubName(`userssub_${regionName}`)
const pubName = createPubSubName('userspub')
regionName,
subName,
pubName
}: ReplicationArgs & { subName: string; pubName: string }): Promise<void> => {
try {
const { rows: pubExist } = await from.public.raw(
`SELECT pubname FROM pg_publication WHERE pubname = '${pubName}';`
@@ -278,107 +310,6 @@ const dropUserReplicationIfExists = async ({
return
}
const setUpProjectReplication = async ({
from,
to,
regionName,
sslmode
}: ReplicationArgs): Promise<void> => {
const subName = createPubSubName(`projectsub_${regionName}`)
const pubName = createPubSubName('projectpub')
try {
await from.public.raw(`CREATE PUBLICATION ${pubName} FOR TABLE streams;`)
} catch (err) {
if (!(err instanceof Error))
throw new DatabaseError(
'Could not create publication {pubName} when setting up project replication for region {regionName}',
from.public,
{
cause: ensureError(
sanitizeError(err),
'Unknown database error when creating publication'
),
info: { pubName, regionName }
}
)
if (
!err.message.includes('already exists') &&
!err.message.includes('duplicate key value violates unique constraint')
)
throw new DatabaseError(
'Unknown error while creating publication {pubName} when setting up project replication for region {regionName}',
from.public,
{
cause: ensureError(
sanitizeError(err),
'Unknown database error when creating publication'
),
info: { pubName, regionName }
}
)
}
const fromUrl = new URL(
from.private
? from.private.client.config.connection.connectionString
: from.public.client.config.connection.connectionString
)
const port = fromUrl.port ? fromUrl.port : '5432'
const fromDbName = fromUrl.pathname.replace('/', '')
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
?,
?,
?,
?,
TRUE,
TRUE
);`
try {
await to.public.raw('CREATE EXTENSION IF NOT EXISTS "aiven_extras"')
await to.public.raw(rawSqeel, [
subName,
`dbname=${fromDbName} host=${fromUrl.hostname} port=${port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}`,
pubName,
subName
])
} catch (err) {
if (!(err instanceof Error))
throw new DatabaseError(
'Could not create subscription {subName} to {pubName} when setting up project replication for region {regionName}',
to.public,
{
cause: ensureError(
sanitizeError(err),
'Unknown database error when creating subscription'
),
info: { subName, pubName, regionName }
}
)
if (
!err.message.includes('already exists') &&
!err.message.includes('duplicate key value violates unique constraint')
)
throw new DatabaseError(
'Unknown error while creating subscription {subName} to {pubName} when setting up project replication for region {regionName}',
to.public,
{
cause: ensureError(
sanitizeError(err),
'Unknown database error when creating subscription'
),
info: { subName, pubName, regionName }
}
)
}
}
const sanitizeError = (err: unknown): unknown => {
if (!err) return err
if ((get(err, 'where') as unknown as string).includes('password='))
return { ...err, where: '[REDACTED AS IT CONTAINS CONNECTION STRING]' }
}
export const resetRegisteredRegions = () => {
if (!isTestEnv()) {
throw new TestOnlyLogicError()
@@ -1,8 +1,8 @@
import { expect } from 'chai'
import { responseHandlerFactory } from '@/modules/previews/services/responses'
import { testLogger as logger } from '@/observability/logging'
import { buildConsumePreviewResult } from '@/modules/previews/resultListener'
import cryptoRandomString from 'crypto-random-string'
import { logger } from '@/observability/logging'
describe('object preview @previews', () => {
describe('responseHandlerFactory creates a function, that', () => {
+17 -10
View File
@@ -147,14 +147,6 @@ export const asMultiregionalOperation = async <T, K extends [Knex, ...Knex[]]>(
* @description reference to the main db (first one passed in the array)
*/
mainDb: Knex
/**
* @description reference for second db (first one not main)
*/
regionDb: Knex
/**
* @description reference for all regions (all dbs except the main one)
*/
regionDbs: Knex[]
emit: EventBusEmit
}) => MaybeAsync<T>,
params: {
@@ -209,8 +201,6 @@ export const asMultiregionalOperation = async <T, K extends [Knex, ...Knex[]]>(
result = await operation({
mainDb: mainDbTx,
allDbs: trxs,
regionDb: regionDbsTx[0],
regionDbs: regionDbsTx,
emit
})
@@ -267,3 +257,20 @@ export const asMultiregionalOperation = async <T, K extends [Knex, ...Knex[]]>(
}
)
}
/**
* Helper intended to be used with asMultiregionOperation that returns a curried function
* to apply a factory built with { db: Knex} to multiple dbs, with same input returning the first result.
* @param dbs Knex[]
* @param factory a function that recieves a db constructor
* @returns the result of the first database
*/
export function replicateFactory<Args extends unknown[], ReturnType>(
dbs: Knex[],
factory: (context: { db: Knex }) => (...args: Args) => Promise<ReturnType>
): (...args: Args) => Promise<ReturnType> {
return async (...args: Args): Promise<ReturnType> => {
const [result] = await Promise.all(dbs.map((db) => factory({ db })(...args)))
return result
}
}
@@ -176,9 +176,9 @@ isMultiRegionTestMode()
await manyParallelCreates()
const [{ count }] = await db('users').count()
expect(count).to.eql(1000)
expect(count).to.eql('1000')
await sleep(1000) // just in case
await sleep(50)
const connectionsUsedAfter = main.client.pool.numUsed()
expect(connectionsUsedAfter).to.be.lte(connectionsUsedBefore)
@@ -488,10 +488,10 @@ export type DenyWorkspaceJoinRequest = (
/**
* Updates project region and moves all regional data to target regional db
*/
export type UpdateProjectRegion = (params: {
export type MoveProjectToRegion = (params: {
projectId: string
regionKey: string
}) => Promise<Stream>
}) => Promise<void>
/**
* Given a count of objects successfully copied to another region, confirm that these counts
@@ -5,7 +5,6 @@ import { removePrivateFields } from '@/modules/core/helpers/userHelper'
import {
updateProjectFactory,
getStreamFactory,
deleteStreamFactory,
revokeStreamPermissionsFactory,
grantStreamPermissionsFactory,
getStreamCollaboratorsFactory,
@@ -151,9 +150,18 @@ import {
import { updateStreamRoleAndNotifyFactory } from '@/modules/core/services/streams/management'
import { getUserFactory, getUsersFactory } from '@/modules/core/repositories/users'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { asOperation, commandFactory } from '@/modules/shared/command'
import {
asMultiregionalOperation,
asOperation,
commandFactory,
replicateFactory
} from '@/modules/shared/command'
import { throwIfRateLimitedFactory } from '@/modules/core/utils/ratelimiter'
import { getProjectDbClient, getRegionDb } from '@/modules/multiregion/utils/dbSelector'
import {
getAllRegisteredDbs,
getProjectDbClient,
getProjectReplicationDbs
} from '@/modules/multiregion/utils/dbSelector'
import {
listUserExpiredSsoSessionsFactory,
listWorkspaceSsoMembershipsByUserEmailFactory
@@ -180,7 +188,6 @@ import {
getWorkspaceWithPlanFactory,
upsertWorkspacePlanFactory
} from '@/modules/gatekeeper/repositories/billing'
import type { Knex } from 'knex'
import { getPaginatedItemsFactory } from '@/modules/shared/services/paginatedItems'
import { BadRequestError, UnauthorizedError } from '@/modules/shared/errors'
import {
@@ -193,6 +200,7 @@ import {
} from '@/modules/workspaces/repositories/workspaceJoinRequests'
import { sendWorkspaceJoinRequestReceivedEmailFactory } from '@/modules/workspaces/services/workspaceJoinRequestEmails/received'
import {
deleteProjectFactory,
getProjectFactory,
getUserProjectRolesFactory
} from '@/modules/core/repositories/projects'
@@ -229,10 +237,13 @@ import {
} from '@/modules/serverinvites/services/coreFinalization'
import { WorkspaceInvitesLimit } from '@/modules/workspaces/domain/constants'
import { copyWorkspaceFactory } from '@/modules/workspaces/repositories/projectRegions'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import { WorkspacePlanNotFoundError } from '@/modules/gatekeeper/errors/billing'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
const eventBus = getEventBus()
const getServerInfo = getServerInfoFactory({ db })
const getUser = getUserFactory({ db })
const getUsers = getUsersFactory({ db })
@@ -805,38 +816,36 @@ export default FF_WORKSPACES_MODULE_ENABLED
}
}
const deleteWorkspaceFrom = (db: Knex) =>
deleteWorkspaceFactory({
deleteWorkspace: repoDeleteWorkspaceFactory({ db }),
deleteProject: deleteStreamFactory({ db }),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db }),
emitWorkspaceEvent: getEventBus().emit
})
// this is a bit of an overhead, we are issuing delete queries to all regions,
// instead of being selective and clever about figuring out the project DB and only
// deleting from main and the project db
// while workspace must be deleted from all regions
// this should be turned into a get all regions and map over the regions...
const region = await getDefaultRegionFactory({ db })({ workspaceId })
if (region) {
const regionDb = await getRegionDb({ regionKey: region.key })
await withOperationLogging(
async () => await deleteWorkspaceFrom(regionDb)({ workspaceId }),
{
logger: logger.child({ regionKey: region.key }),
operationName: 'deleteWorkspaceFromRegion',
operationDescription: 'Delete workspace from region'
}
)
}
await withOperationLogging(
async () => await deleteWorkspaceFrom(db)({ workspaceId }),
await asMultiregionalOperation(
async ({ mainDb, allDbs, emit }) =>
deleteWorkspaceFactory({
deleteWorkspace: replicateFactory(allDbs, repoDeleteWorkspaceFactory),
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(
allDbs,
deleteProjectCommitsFactory
)
}),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({
db: mainDb
}),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db: mainDb })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }),
emitWorkspaceEvent: emit
})({ workspaceId }),
{
logger,
operationName: 'deleteWorkspace',
operationDescription: 'Delete workspace'
name: 'delete workspace',
description: 'Delete workspace',
dbs: await getAllRegisteredDbs()
}
)
@@ -1606,63 +1615,76 @@ export default FF_WORKSPACES_MODULE_ENABLED
throw mapAuthToServerError(canMoveToWorkspace.error)
}
const moveProjectToWorkspace = commandFactory({
db,
eventBus,
operationFactory: ({ db, emit }) =>
const updatedProject = await asMultiregionalOperation(
({ mainDb, allDbs, emit }) =>
moveProjectToWorkspaceFactory({
getProject: getProjectFactory({ db }),
updateProject: updateProjectFactory({ db: projectDb }),
updateProjectRole: updateStreamRoleAndNotify,
getProjectCollaborators: getStreamCollaboratorsFactory({ db }),
getProject: getProjectFactory({ db: mainDb }),
updateProject: replicateFactory(allDbs, updateProjectFactory),
updateProjectRole: updateStreamRoleAndNotifyFactory({
isStreamCollaborator: isStreamCollaboratorFactory({
getStream: getStreamFactory({ db: mainDb })
}),
addOrUpdateStreamCollaborator: addOrUpdateStreamCollaboratorFactory({
validateStreamAccess,
getUser: getUserFactory({ db: mainDb }),
grantStreamPermissions: grantStreamPermissionsFactory({
db: mainDb
}),
getStreamRoles: getStreamRolesFactory({ db: mainDb }),
emitEvent: emit
}),
removeStreamCollaborator
}),
getProjectCollaborators: getStreamCollaboratorsFactory({ db: mainDb }),
copyWorkspace: copyWorkspaceFactory({
// TODO: must be removed when workspace replication is implemented
sourceDb: db,
targetDb: projectDb
}),
getWorkspaceRolesAndSeats: getWorkspaceRolesAndSeatsFactory({ db }),
getWorkspaceRolesAndSeats: getWorkspaceRolesAndSeatsFactory({
db: mainDb
}),
updateWorkspaceRole: addOrUpdateWorkspaceRoleFactory({
getWorkspaceRoles: getWorkspaceRolesFactory({ db }),
getWorkspaceWithDomains: getWorkspaceWithDomainsFactory({ db }),
findVerifiedEmailsByUserId: findVerifiedEmailsByUserIdFactory({
db
getWorkspaceRoles: getWorkspaceRolesFactory({ db: mainDb }),
getWorkspaceWithDomains: getWorkspaceWithDomainsFactory({
db: mainDb
}),
upsertWorkspaceRole: upsertWorkspaceRoleFactory({ db }),
findVerifiedEmailsByUserId: findVerifiedEmailsByUserIdFactory({
db: mainDb
}),
upsertWorkspaceRole: upsertWorkspaceRoleFactory({ db: mainDb }),
emitWorkspaceEvent: emit,
ensureValidWorkspaceRoleSeat: ensureValidWorkspaceRoleSeatFactory({
createWorkspaceSeat: createWorkspaceSeatFactory({ db }),
getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db }),
createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }),
getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db: mainDb }),
getWorkspaceDefaultSeatType: getWorkspaceDefaultSeatTypeFactory({
getWorkspace: getWorkspaceFactory({ db })
getWorkspace: getWorkspaceFactory({ db: mainDb })
}),
eventEmit: emit
}),
assignWorkspaceSeat: assignWorkspaceSeatFactory({
createWorkspaceSeat: createWorkspaceSeatFactory({ db }),
createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }),
getWorkspaceRoleForUser: getWorkspaceRoleForUserFactory({
db
db: mainDb
}),
eventEmit: emit,
getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db })
getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db: mainDb })
})
}),
createWorkspaceSeat: createWorkspaceSeatFactory({ db }),
getWorkspaceWithPlan: getWorkspaceWithPlanFactory({ db }),
getWorkspaceDomains: getWorkspaceDomainsFactory({ db }),
getUserEmails: findEmailsByUserIdFactory({ db })
})
})
const updatedProject = await withOperationLogging(
async () =>
await moveProjectToWorkspace({
createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }),
getWorkspaceWithPlan: getWorkspaceWithPlanFactory({ db: mainDb }),
getWorkspaceDomains: getWorkspaceDomainsFactory({ db: mainDb }),
getUserEmails: findEmailsByUserIdFactory({ db: mainDb })
})({
projectId,
workspaceId,
movedByUserId: context.userId!
}),
{
logger,
operationName: 'moveProjectToWorkspace',
operationDescription: 'Move project to workspace'
name: 'moveProjectToWorkspace',
description: 'Move project to workspace',
dbs: await getProjectReplicationDbs({ projectId })
}
)
+43 -19
View File
@@ -1,4 +1,5 @@
import type cron from 'node-cron'
import type { Logger } from '@/observability/logging'
import { moduleLogger } from '@/observability/logging'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { registerOrUpdateScopeFactory } from '@/modules/shared/repositories/scopes'
@@ -19,18 +20,21 @@ import {
} from '@/modules/core/repositories/scheduledTasks'
import { getWorkspacesNonCompleteFactory } from '@/modules/workspaces/repositories/workspaces'
import { deleteWorkspacesNonCompleteFactory } from '@/modules/workspaces/services/workspaceCreationState'
import {
deleteStreamFactory,
getExplicitProjects
} from '@/modules/core/repositories/streams'
import { getExplicitProjects } from '@/modules/core/repositories/streams'
import { deleteSsoProviderFactory } from '@/modules/workspaces/repositories/sso'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites'
import { deleteWorkspaceFactory as repoDeleteWorkspaceFactory } from '@/modules/workspaces/repositories/workspaces'
import { deleteWorkspaceFactory } from '@/modules/workspaces/services/management'
import { scheduleUpdateAllWorkspacesTracking } from '@/modules/workspaces/services/tracking'
import { getClient } from '@/modules/shared/utils/mixpanel'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector'
const {
FF_WORKSPACES_MODULE_ENABLED,
@@ -56,19 +60,39 @@ const scheduleDeleteWorkspacesNonComplete = ({
}: {
scheduleExecution: ScheduleExecution
}) => {
const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({
getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db }),
deleteWorkspace: deleteWorkspaceFactory({
deleteWorkspace: repoDeleteWorkspaceFactory({ db }),
deleteProject: deleteStreamFactory({ db }),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db }),
emitWorkspaceEvent: getEventBus().emit
})
})
const deleteWorkspacesNonComplete = async ({ logger }: { logger: Logger }) =>
asMultiregionalOperation(
({ allDbs, mainDb, emit }) => {
const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({
getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db: mainDb }),
deleteWorkspace: deleteWorkspaceFactory({
deleteWorkspace: replicateFactory(allDbs, repoDeleteWorkspaceFactory),
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(
allDbs,
deleteProjectCommitsFactory
)
}),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({
db: mainDb
}),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db: mainDb })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }),
emitWorkspaceEvent: emit
})
})
return deleteWorkspacesNonComplete({ logger })
},
{
logger,
name: 'deleteWorkspacesNonComplete',
dbs: await getAllRegisteredDbs()
}
)
const every30Mins = '*/30 * * * *'
return scheduleExecution(
@@ -58,12 +58,14 @@ import { chunk, isEmpty, omit } from 'lodash-es'
import { userEmailsCompliantWithWorkspaceDomains } from '@/modules/workspaces/domain/logic'
import { workspaceRoles as workspaceRoleDefinitions } from '@/modules/workspaces/roles'
import { blockedDomains } from '@speckle/shared'
import type { DeleteStreamRecord } from '@/modules/core/domain/streams/operations'
import type {
DeleteSsoProvider,
GetWorkspaceSsoProviderRecord
} from '@/modules/workspaces/domain/sso/operations'
import type { QueryAllProjects } from '@/modules/core/domain/projects/operations'
import type {
DeleteProjectAndCommits,
QueryAllProjects
} from '@/modules/core/domain/projects/operations'
type WorkspaceCreateArgs = {
userId: string
@@ -288,14 +290,14 @@ type WorkspaceDeleteArgs = {
export const deleteWorkspaceFactory =
({
deleteWorkspace,
deleteProject,
deleteProjectAndCommits,
queryAllProjects,
deleteAllResourceInvites,
deleteSsoProvider,
emitWorkspaceEvent
}: {
deleteWorkspace: DeleteWorkspace
deleteProject: DeleteStreamRecord
deleteProjectAndCommits: DeleteProjectAndCommits
queryAllProjects: QueryAllProjects
deleteAllResourceInvites: DeleteAllResourceInvites
deleteSsoProvider: DeleteSsoProvider
@@ -328,7 +330,9 @@ export const deleteWorkspaceFactory =
// Workspace delete cascades-deletes stream table rows, but some manual cleanup is required
// We re-use `deleteStream` (and re-delete the project) to DRY this manual cleanup
for (const projectIdsChunk of chunk(projectIds, 25)) {
await Promise.all(projectIdsChunk.map((projectId) => deleteProject(projectId)))
await Promise.all(
projectIdsChunk.map((projectId) => deleteProjectAndCommits({ projectId }))
)
}
await emitWorkspaceEvent({
eventName: WorkspaceEvents.Deleted,
@@ -1,5 +1,4 @@
import type { GetProject } from '@/modules/core/domain/projects/operations'
import type { UpdateProjectRegionKey } from '@/modules/multiregion/services/projectRegion'
import type {
CopyProjectAutomations,
CopyProjectBlobs,
@@ -17,13 +16,13 @@ import type {
CountProjectVersions,
CountProjectWebhooks,
GetAvailableRegions,
UpdateProjectRegion,
MoveProjectToRegion,
ValidateProjectRegionCopy
} from '@/modules/workspaces/domain/operations'
import { ProjectRegionAssignmentError } from '@/modules/workspaces/errors/regions'
import { logger } from '@/observability/logging'
export const updateProjectRegionFactory =
export const moveProjectToRegionFactory =
(deps: {
getProject: GetProject
getAvailableRegions: GetAvailableRegions
@@ -37,8 +36,7 @@ export const updateProjectRegionFactory =
copyProjectWebhooks: CopyProjectWebhooks
copyProjectBlobs: CopyProjectBlobs
validateProjectRegionCopy: ValidateProjectRegionCopy
updateProjectRegionKey: UpdateProjectRegionKey
}): UpdateProjectRegion =>
}): MoveProjectToRegion =>
async (params) => {
const { projectId, regionKey } = params
@@ -120,9 +118,6 @@ export const updateProjectRegionFactory =
'Missing data from source project in target region copy after move.'
)
}
// Update project region in db and update relevant caches
return await deps.updateProjectRegionKey({ projectId, regionKey })
}
export const validateProjectRegionCopyFactory =
@@ -30,21 +30,15 @@ import { ProjectNotFoundError } from '@/modules/core/errors/projects'
import type { WorkspaceProjectCreateInput } from '@/modules/core/graph/generated/graphql'
import {
getDb,
getReplicationDbs,
getValidDefaultProjectRegionKey
} from '@/modules/multiregion/utils/dbSelector'
import { createNewProjectFactory } from '@/modules/core/services/projects'
import {
createNewProjectFactory,
waitForRegionProjectFactory
} from '@/modules/core/services/projects'
import {
deleteProjectFactory,
getProjectFactory,
storeProjectFactory,
storeProjectRoleFactory
} from '@/modules/core/repositories/projects'
import { mainDb } from '@/db/knex'
import { storeModelFactory } from '@/modules/core/repositories/models'
import { getEventBus } from '@/modules/shared/services/eventBus'
import {
getWorkspaceFactory,
upsertWorkspaceFactory
@@ -59,6 +53,8 @@ import type { FindEmailsByUserId } from '@/modules/core/domain/userEmails/operat
import { userEmailsCompliantWithWorkspaceDomains } from '@/modules/workspaces/domain/logic'
import type { CreateWorkspaceSeat } from '@/modules/gatekeeper/domain/operations'
import type { WorkspaceAcl } from '@/modules/workspacesCore/domain/types'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
type MoveProjectToWorkspaceArgs = {
projectId: string
@@ -308,6 +304,7 @@ export const validateWorkspaceMemberProjectRoleFactory =
}
}
// This factory uses the command factory to create a new project in transactional (cross region) so it cannot be wrapped in another transaction
export const createWorkspaceProjectFactory =
(deps: { getDefaultRegion: GetDefaultRegion }) =>
async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => {
@@ -334,26 +331,28 @@ export const createWorkspaceProjectFactory =
if (!workspace) throw new WorkspaceNotFoundError()
await upsertWorkspaceFactory({ db: projectDb })({ workspace })
}
const project = await asMultiregionalOperation(
async ({ allDbs, mainDb, emit }) => {
const createNewProject = createNewProjectFactory({
// TODO: this goes as event emmits outside (default model)
storeProject: replicateFactory(allDbs, storeProjectFactory),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db: mainDb }),
emitEvent: emit
})
// todo, use the command factory here, but for that, we need to migrate to the event bus
// deps not injected to ensure proper DB injection
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
waitForRegionProject: waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb })
}),
emitEvent: getEventBus().emit
})
const project = await createNewProject({
...input,
regionKey,
ownerId
})
return createNewProject({
...input,
regionKey,
ownerId
})
},
{
dbs: await getReplicationDbs({ regionKey }),
name: 'Create project workspace',
logger
}
)
return project
}
@@ -2,12 +2,7 @@ import { db } from '@/db/knex'
import { StreamAcl, Streams } from '@/modules/core/dbSchema'
import type { StreamRecord } from '@/modules/core/helpers/types'
import { ProjectRecordVisibility } from '@/modules/core/helpers/types'
import {
deleteProjectFactory,
getProjectFactory
} from '@/modules/core/repositories/projects'
import { grantStreamPermissionsFactory } from '@/modules/core/repositories/streams'
import { waitForRegionProjectFactory } from '@/modules/core/services/projects'
import { WorkspaceSeatType } from '@/modules/gatekeeper/domain/billing'
import { getWorkspaceUserSeatsFactory } from '@/modules/gatekeeper/repositories/workspaceSeat'
import { getRegionDb } from '@/modules/multiregion/utils/dbSelector'
@@ -1010,13 +1005,6 @@ describe('Workspace project GQL CRUD', () => {
// Simulate non-main default db region
const regionDb = await getRegionDb({ regionKey: 'region1' })
await tables.streams(regionDb).insert(regionalProject)
await waitForRegionProjectFactory({
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: regionDb })
})({
projectId: regionalProject.id,
regionKey: 'region1'
})
await grantStreamPermissions({
streamId: regionalProject.id,
userId: serverAdminUser.id,
@@ -14,17 +14,21 @@ import {
import { expect } from 'chai'
import dayjs from 'dayjs'
import { deleteWorkspacesNonCompleteFactory } from '@/modules/workspaces/services/workspaceCreationState'
import type { Logger } from '@/observability/logging'
import { logger } from '@/observability/logging'
import {
deleteStreamFactory,
getExplicitProjects
} from '@/modules/core/repositories/streams'
import { getExplicitProjects } from '@/modules/core/repositories/streams'
import { deleteSsoProviderFactory } from '@/modules/workspaces/repositories/sso'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites'
import { deleteWorkspaceFactory as repoDeleteWorkspaceFactory } from '@/modules/workspaces/repositories/workspaces'
import { deleteWorkspaceFactory } from '@/modules/workspaces/services/management'
import { queryAllProjectsFactory } from '@/modules/core/services/projects'
import {
deleteProjectAndCommitsFactory,
queryAllProjectsFactory
} from '@/modules/core/services/projects'
import { deleteProjectFactory } from '@/modules/core/repositories/projects'
import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector'
const updateAWorkspaceCreatedAt = async (
workspaceId: string,
@@ -39,19 +43,45 @@ const updateAWorkspaceCreatedAt = async (
describe('WorkspaceCreationState services', () => {
const getWorkspacesNonComplete = getWorkspacesNonCompleteFactory({ db })
const getWorkspace = getWorkspaceFactory({ db })
const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({
getWorkspacesNonComplete,
deleteWorkspace: deleteWorkspaceFactory({
deleteWorkspace: repoDeleteWorkspaceFactory({ db }),
deleteProject: deleteStreamFactory({ db }),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db }),
emitWorkspaceEvent: getEventBus().emit
})
})
const deleteWorkspacesNonComplete = async ({ logger }: { logger: Logger }) =>
asMultiregionalOperation(
({ allDbs, mainDb, emit }) => {
const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({
getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db: mainDb }),
deleteWorkspace: deleteWorkspaceFactory({
deleteWorkspace: async (...input) => {
const [res] = await Promise.all(
allDbs.map((db) => repoDeleteWorkspaceFactory({ db })(...input))
)
return res
},
deleteProjectAndCommits: deleteProjectAndCommitsFactory({
deleteProject: replicateFactory(allDbs, deleteProjectFactory),
deleteProjectCommits: replicateFactory(
allDbs,
deleteProjectCommitsFactory
)
}),
deleteAllResourceInvites: deleteAllResourceInvitesFactory({
db: mainDb
}),
queryAllProjects: queryAllProjectsFactory({
getExplicitProjects: getExplicitProjects({ db: mainDb })
}),
deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }),
emitWorkspaceEvent: emit
})
})
return deleteWorkspacesNonComplete({ logger })
},
{
logger,
name: 'deleteWorkspacesNonComplete',
dbs: await getAllRegisteredDbs()
}
)
let adminUser: BasicTestUser
let completeWorkspace: BasicTestWorkspace
+1 -1
View File
@@ -28,7 +28,7 @@
"ts-gqlgen": "tsx --import ./esmLoader.js ./bin/gqlgen",
"test": "cross-env TSX=true NODE_ENV=test LOG_FILTER=test LOG_PRETTY=true yarn ts-mocha",
"test:all-ff": "cross-env ENABLE_ALL_FFS=true yarn test",
"test:multiregion": "cross-env RUN_TESTS_IN_MULTIREGION_MODE=true FF_WORKSPACES_MODULE_ENABLED=true FF_WORKSPACES_MULTI_REGION_ENABLED=true yarn test --grep @multiregion",
"test:multiregion": "cross-env RUN_TESTS_IN_MULTIREGION_MODE=true FF_WORKSPACES_MODULE_ENABLED=true FF_WORKSPACES_MULTI_REGION_ENABLED=true FF_MOVE_PROJECT_REGION_ENABLED=true yarn test --grep @multiregion",
"test:no-ff": "cross-env DISABLE_ALL_FFS=true yarn test",
"test:coverage": "cross-env NODE_ENV=test LOG_FILTER=test LOG_PRETTY=true c8 yarn test",
"test:report": "MOCHA_FILE=reports/test-results.xml yarn test:coverage -- --reporter mocha-multi --reporter-options spec=-,mocha-junit-reporter=reports/test-results.xml",
+1 -2
View File
@@ -30,7 +30,6 @@ import {
import { collectAndValidateCoreTargetsFactory } from '@/modules/serverinvites/services/coreResourceCollection'
import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/services/coreEmailContents'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { createBranchFactory } from '@/modules/core/repositories/branches'
import {
getUsersFactory,
getUserFactory,
@@ -120,6 +119,7 @@ const buildFinalizeProjectInvite = () =>
getServerInfo
})
// This does not support multiregion
const createStream = legacyCreateStreamFactory({
createStreamReturnRecord: createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
@@ -145,7 +145,6 @@ const createStream = legacyCreateStreamFactory({
}),
storeProjectRole: storeProjectRoleFactory({ db }),
createStream: createStreamFactory({ db }),
createBranch: createBranchFactory({ db }),
emitEvent: getEventBus().emit
})
})
+1 -2
View File
@@ -19,7 +19,6 @@ import {
getStreamRolesFactory,
grantStreamPermissionsFactory
} from '@/modules/core/repositories/streams'
import { createBranchFactory } from '@/modules/core/repositories/branches'
import {
finalizeInvitedServerRegistrationFactory,
finalizeResourceInviteFactory
@@ -98,6 +97,7 @@ const buildFinalizeProjectInvite = () =>
getServerInfo
})
// This is not supporting multiregion
export const createProject = createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
@@ -121,7 +121,6 @@ export const createProject = createStreamReturnRecordFactory({
getUsers
}),
createStream: createStreamFactory({ db }),
createBranch: createBranchFactory({ db }),
storeProjectRole: storeProjectRoleFactory({ db }),
emitEvent: getEventBus().emit
})
@@ -2,7 +2,6 @@ import { db } from '@/db/knex'
import { StreamAcl } from '@/modules/core/dbSchema'
import { mapDbToGqlProjectVisibility } from '@/modules/core/helpers/project'
import type { StreamAclRecord, StreamRecord } from '@/modules/core/helpers/types'
import { createBranchFactory } from '@/modules/core/repositories/branches'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import {
createStreamFactory,
@@ -65,9 +64,12 @@ import type { StreamRoles } from '@speckle/shared'
import { ensureError, Roles } from '@speckle/shared'
import { omit } from 'lodash-es'
import { storeProjectRoleFactory } from '@/modules/core/repositories/projects'
import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command'
import { logger } from '@/observability/logging'
import type { LegacyCreateStream } from '@/modules/core/domain/streams/operations'
import { getReplicationDbs } from '@/modules/multiregion/utils/dbSelector'
const getServerInfo = getServerInfoFactory({ db })
const getUsers = getUsersFactory({ db })
const getUser = getUserFactory({ db })
const getStream = getStreamFactory({ db })
@@ -117,35 +119,42 @@ const buildFinalizeProjectInvite = () =>
getServerInfo
})
const createStream = legacyCreateStreamFactory({
createStreamReturnRecord: createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
findUserByTarget: findUserByTargetFactory({ db }),
insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db }),
collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({
getStream
}),
buildInviteEmailContents: buildCoreInviteEmailContentsFactory({
getStream
}),
emitEvent: ({ eventName, payload }) =>
getEventBus().emit({
eventName,
payload
const createStream: LegacyCreateStream = async (
stream: Parameters<LegacyCreateStream>[0] & { regionKey?: string }
) =>
asMultiregionalOperation(
async ({ allDbs, mainDb, emit }) =>
legacyCreateStreamFactory({
createStreamReturnRecord: createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
findUserByTarget: findUserByTargetFactory({ db: mainDb }),
insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db: mainDb }),
collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({
getStream: getStreamFactory({ db: mainDb })
}),
buildInviteEmailContents: buildCoreInviteEmailContentsFactory({
getStream: getStreamFactory({ db: mainDb })
}),
emitEvent: emit,
getUser: getUserFactory({ db: mainDb }),
getServerInfo: getServerInfoFactory({ db: mainDb }),
finalizeInvite: buildFinalizeProjectInvite()
}),
getUsers: getUsersFactory({ db: mainDb })
}),
getUser,
getServerInfo,
finalizeInvite: buildFinalizeProjectInvite()
}),
getUsers
}),
createStream: createStreamFactory({ db }),
createBranch: createBranchFactory({ db }),
storeProjectRole: storeProjectRoleFactory({ db }),
emitEvent: getEventBus().emit
})
})
createStream: replicateFactory(allDbs, createStreamFactory),
storeProjectRole: storeProjectRoleFactory({ db: mainDb }),
emitEvent: emit
})
})(stream),
{
name: 'create stream spec',
logger,
description: 'Creates a new stream',
dbs: await getReplicationDbs({ regionKey: stream.regionKey || null })
}
)
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
const isStreamCollaborator = isStreamCollaboratorFactory({
@@ -220,6 +229,7 @@ export async function createTestStream<S extends Partial<BasicTestStream>>(
},
ownerId: owner.id
})
id = newProject.id
} else {
id = await createStream({