From 399c998fd75fa10858d30dae0a71f52837bae7fb Mon Sep 17 00:00:00 2001 From: Daniel Gak Anagrov Date: Thu, 4 Sep 2025 12:07:19 +0100 Subject: [PATCH] feat(multiregion): apply prepared transactions to projects (#5322) * feat(multiregion): replace user replication * chore(multiregion): optimise replication * maybe it's this * postgres is fun * once more * chore(multiregion): only replicate test user creation during multiregion tests * feat: improved replicate_query logic * fix: minor * fix: starting issue * feat: included user create and delete specs to multiregion * feat: removed console logs * fix: user defaults * fix: multiregion test helper * fix: update scenarios for users * refactor(multiregion): swap replicateQuery concept to asMultiregionOperation (#5301) feat(multiregion): introduced asMultregionOperator, refactor test to user builder classes * chore: renamings * fix: remove comments * feat: remove user replication * refactor: simplified spec usages * chore: comments * chore: branches and favs * chore: more tests * chore: more tests * fix linting * fix tests * feat: dropping replication * refactor: moved project delete to service * fix: comment * feat: updateStreamFactory and updateProjectFacotry * deleteProjectFactory + replicateFactory * deleteWorkspaceFactory * fix: selector * fix: tests * fix tests, finished createStreamFactory * feat: simplify changes * fix: remove comment * fix: minor strucutres * fix: moveProjectToRegion * fix: moved branch creation outside of multiregion scope * fix: branch creation * fix: tests * fix: ci tests * fix: removed log form test * fix: on specs, no random regionKeys * review fixes * fix: mr comments * feat: removed test --------- Co-authored-by: Charles Driesler --- .../tests/integration/activitySummary.spec.ts | 32 +++- .../modules/cli/commands/download/project.ts | 15 +- .../modules/cli/commands/stream/clone.ts | 43 ++--- .../comments/tests/comments.graph.spec.ts | 18 +- .../modules/core/domain/commits/operations.ts | 1 + .../core/domain/projects/operations.ts | 2 +- .../modules/core/domain/streams/operations.ts | 6 - .../modules/core/events/projectListeners.ts | 36 ++++ .../modules/core/graph/resolvers/projects.ts | 146 +++++++-------- .../modules/core/graph/resolvers/streams.ts | 172 ++++++++++-------- .../modules/core/graph/resolvers/users.ts | 36 +++- packages/server/modules/core/index.ts | 6 + .../modules/core/repositories/commits.ts | 18 +- .../modules/core/repositories/streams.ts | 20 +- .../server/modules/core/services/projects.ts | 72 ++------ .../core/services/streams/management.ts | 31 +--- .../modules/core/services/users/management.ts | 14 +- .../modules/core/tests/helpers/creation.ts | 2 +- .../integration/projectRepositories.spec.ts | 18 +- .../core/tests/integration/subs.graph.spec.ts | 47 +++-- .../server/modules/core/tests/streams.spec.ts | 100 ++++++---- .../modules/core/tests/unit/projects.spec.ts | 155 +--------------- .../server/modules/core/tests/users.spec.ts | 20 +- .../modules/core/tests/usersAdmin.spec.ts | 63 +++++-- .../server/modules/cross-server-sync/index.ts | 14 +- .../multiregion/services/projectRegion.ts | 14 +- .../modules/multiregion/services/queue.ts | 72 ++++---- .../tests/e2e/serverAdmin.graph.spec.ts | 9 +- .../modules/multiregion/tests/helpers.ts | 21 ++- .../modules/multiregion/utils/dbSelector.ts | 167 +++++------------ .../previews/tests/unit/responses.spec.ts | 2 +- packages/server/modules/shared/command.ts | 27 ++- .../modules/shared/test/dbHelper.spec.ts | 4 +- .../modules/workspaces/domain/operations.ts | 4 +- .../workspaces/graph/resolvers/workspaces.ts | 156 +++++++++------- packages/server/modules/workspaces/index.ts | 62 +++++-- .../modules/workspaces/services/management.ts | 14 +- .../workspaces/services/projectRegions.ts | 11 +- .../modules/workspaces/services/projects.ts | 53 +++--- .../tests/integration/projects.graph.spec.ts | 12 -- .../workspacesCreationState.spec.ts | 68 +++++-- packages/server/package.json | 2 +- packages/server/scripts/streamObjects.ts | 3 +- packages/server/test/projectHelper.ts | 3 +- .../test/speckle-helpers/streamHelper.ts | 70 ++++--- 45 files changed, 923 insertions(+), 938 deletions(-) create mode 100644 packages/server/modules/core/events/projectListeners.ts diff --git a/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts b/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts index 3a74a6b5f..fe6996fc5 100644 --- a/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts +++ b/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts @@ -21,12 +21,16 @@ import { saveStreamActivityFactory } from '@/modules/activitystream/repositories' import { db } from '@/db/knex' -import { - deleteStreamFactory, - getStreamFactory -} from '@/modules/core/repositories/streams' +import { getStreamFactory } from '@/modules/core/repositories/streams' import { getUserFactory } from '@/modules/core/repositories/users' import { createTestStream } from '@/test/speckle-helpers/streamHelper' +import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' +import type { DeleteProjectAndCommits } from '@/modules/core/domain/projects/operations' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' +import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector' const cleanup = async () => { await truncateTables([StreamActivity.name, Users.name]) @@ -40,7 +44,22 @@ const createActivitySummary = createActivitySummaryFactory({ getActivity: geUserStreamActivityFactory({ db }), getUser }) -const deleteStream = deleteStreamFactory({ db }) +const deleteStreamAndCommits: DeleteProjectAndCommits = async ({ projectId }) => + asMultiregionalOperation( + async ({ allDbs }) => + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) + })({ projectId }), + { + name: 'deleteStreamAndCommits spec', + logger, + dbs: await getProjectReplicationDbs({ projectId }) + } + ) describe('Activity summary @activity', () => { const userA: BasicTestUser = { @@ -104,6 +123,7 @@ describe('Activity summary @activity', () => { async (stream) => (await createTestStream(stream, userA)).id ) ) + await saveActivity({ streamId, resourceType: StreamResourceTypes.Stream, @@ -113,7 +133,7 @@ describe('Activity summary @activity', () => { info: {}, message: 'foo' }) - await deleteStream(streamId) + await deleteStreamAndCommits({ projectId: streamId }) const summary = await createActivitySummary({ userId: userA.id, streamIds: [streamId], diff --git a/packages/server/modules/cli/commands/download/project.ts b/packages/server/modules/cli/commands/download/project.ts index 58b51a37b..e89c5b65f 100644 --- a/packages/server/modules/cli/commands/download/project.ts +++ b/packages/server/modules/cli/commands/download/project.ts @@ -57,17 +57,11 @@ import { authorizeResolver } from '@/modules/shared' import { Roles } from '@speckle/shared' import { getDefaultRegionFactory } from '@/modules/workspaces/repositories/regions' import { getDb } from '@/modules/multiregion/utils/dbSelector' +import { createNewProjectFactory } from '@/modules/core/services/projects' import { - createNewProjectFactory, - waitForRegionProjectFactory -} from '@/modules/core/services/projects' -import { - deleteProjectFactory, - getProjectFactory, storeProjectFactory, storeProjectRoleFactory } from '@/modules/core/repositories/projects' -import { storeModelFactory } from '@/modules/core/repositories/models' import { getEventBus } from '@/modules/shared/services/eventBus' import { getViewerResourceGroupsFactory, @@ -202,14 +196,11 @@ const command: CommandModule< const getUser = getUserFactory({ db }) const createNewProject = createNewProjectFactory({ + // TODO: this goes as event emmits outside (default model) + // This does not support multiregion storeProject: storeProjectFactory({ db: projectDb }), - storeModel: storeModelFactory({ db: projectDb }), // THIS MUST GO TO THE MAIN DB storeProjectRole: storeProjectRoleFactory({ db }), - waitForRegionProject: waitForRegionProjectFactory({ - getProject: getProjectFactory({ db: projectDb }), - deleteProject: deleteProjectFactory({ db: projectDb }) - }), emitEvent: getEventBus().emit }) diff --git a/packages/server/modules/cli/commands/stream/clone.ts b/packages/server/modules/cli/commands/stream/clone.ts index 24418c4e6..d1b163541 100644 --- a/packages/server/modules/cli/commands/stream/clone.ts +++ b/packages/server/modules/cli/commands/stream/clone.ts @@ -23,8 +23,9 @@ import { import { getUserFactory } from '@/modules/core/repositories/users' import { cloneStreamFactory } from '@/modules/core/services/streams/clone' import type { CommandModule } from 'yargs' -import { asOperation } from '@/modules/shared/command' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' import { storeProjectRoleFactory } from '@/modules/core/repositories/projects' +import { db } from '@/db/knex' const command: CommandModule< unknown, @@ -48,34 +49,34 @@ const command: CommandModule< logger.info( `Cloning stream ${sourceStreamId} into the account of user ${targetUserId}...` ) - const { id } = await asOperation( - ({ emit, db }) => { + const { id } = await asMultiregionalOperation( + ({ emit, mainDb, allDbs }) => { const cloneStream = cloneStreamFactory({ - getStream: getStreamFactory({ db }), - getUser: getUserFactory({ db }), - newProjectDb: db, - sourceProjectDb: db, - createStream: createStreamFactory({ db }), - insertCommits: insertCommitsFactory({ db }), - getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db }), - insertStreamCommits: insertStreamCommitsFactory({ db }), - getBatchedStreamBranches: getBatchedStreamBranchesFactory({ db }), - insertBranches: insertBranchesFactory({ db }), - getBatchedBranchCommits: getBatchedBranchCommitsFactory({ db }), - insertBranchCommits: insertBranchCommitsFactory({ db }), - getBatchedStreamComments: getBatchedStreamCommentsFactory({ db }), - insertComments: insertCommentsFactory({ db }), - getCommentLinks: getCommentLinksFactory({ db }), - insertCommentLinks: insertCommentLinksFactory({ db }), + getStream: getStreamFactory({ db: mainDb }), + getUser: getUserFactory({ db: mainDb }), + newProjectDb: mainDb, + sourceProjectDb: mainDb, + createStream: replicateFactory(allDbs, createStreamFactory), + insertCommits: insertCommitsFactory({ db: mainDb }), + getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db: mainDb }), + insertStreamCommits: insertStreamCommitsFactory({ db: mainDb }), + getBatchedStreamBranches: getBatchedStreamBranchesFactory({ db: mainDb }), + insertBranches: insertBranchesFactory({ db: mainDb }), + getBatchedBranchCommits: getBatchedBranchCommitsFactory({ db: mainDb }), + insertBranchCommits: insertBranchCommitsFactory({ db: mainDb }), + getBatchedStreamComments: getBatchedStreamCommentsFactory({ db: mainDb }), + insertComments: insertCommentsFactory({ db: mainDb }), + getCommentLinks: getCommentLinksFactory({ db: mainDb }), + insertCommentLinks: insertCommentLinksFactory({ db: mainDb }), emitEvent: emit, - storeProjectRole: storeProjectRoleFactory({ db }) + storeProjectRole: storeProjectRoleFactory({ db: mainDb }) }) return cloneStream(targetUserId, sourceStreamId) }, { - transaction: true, name: 'Clone Stream', + dbs: [db], // Cloning does not support multiregion logger } ) diff --git a/packages/server/modules/comments/tests/comments.graph.spec.ts b/packages/server/modules/comments/tests/comments.graph.spec.ts index 1395e46b8..c99d76553 100644 --- a/packages/server/modules/comments/tests/comments.graph.spec.ts +++ b/packages/server/modules/comments/tests/comments.graph.spec.ts @@ -52,7 +52,6 @@ import { storeSingleObjectIfNotFoundFactory, getStreamObjectsFactory } from '@/modules/core/repositories/objects' -import { legacyUpdateStreamFactory } from '@/modules/core/services/streams/management' import { createObjectFactory } from '@/modules/core/services/objects/management' import { getViewerResourcesFromLegacyIdentifiersFactory, @@ -63,6 +62,10 @@ import { createProject } from '@/test/projectHelper' import type { BasicTestUser } from '@/test/authHelper' import { createTestUser } from '@/test/authHelper' import { getEventBus } from '@/modules/shared/services/eventBus' +import type { UpdateStreamRecord } from '@/modules/core/domain/streams/operations' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' +import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector' const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db }) const streamResourceCheck = streamResourceCheckFactory({ @@ -112,9 +115,16 @@ const createCommitByBranchName = createCommitByBranchNameFactory({ getBranchById: getBranchByIdFactory({ db }) }) -const updateStream = legacyUpdateStreamFactory({ - updateStream: updateStreamFactory({ db }) -}) +const updateStream: UpdateStreamRecord = async (update) => + asMultiregionalOperation( + async ({ allDbs }) => replicateFactory(allDbs, updateStreamFactory)(update), + { + logger, + name: 'updateStream', + dbs: await getProjectReplicationDbs({ projectId: update.id }) + } + ) + const grantPermissionsStream = grantStreamPermissionsFactory({ db }) const createObject = createObjectFactory({ diff --git a/packages/server/modules/core/domain/commits/operations.ts b/packages/server/modules/core/domain/commits/operations.ts index ace1407c4..7310009d3 100644 --- a/packages/server/modules/core/domain/commits/operations.ts +++ b/packages/server/modules/core/domain/commits/operations.ts @@ -45,6 +45,7 @@ export type GetCommit = ( export type DeleteCommits = (commitIds: string[]) => Promise export type DeleteCommit = (commitId: string) => Promise +export type DeleteProjectCommits = (params: { projectId: string }) => Promise export type DeleteCommitAndNotify = ( commitId: string, diff --git a/packages/server/modules/core/domain/projects/operations.ts b/packages/server/modules/core/domain/projects/operations.ts index 87aae1145..244efbd08 100644 --- a/packages/server/modules/core/domain/projects/operations.ts +++ b/packages/server/modules/core/domain/projects/operations.ts @@ -40,6 +40,7 @@ export type DeleteProjectRole = (args: { }) => Promise export type DeleteProject = (args: { projectId: string }) => Promise +export type DeleteProjectAndCommits = (args: { projectId: string }) => Promise export type GetUserProjectRoles = ({ userId, @@ -61,7 +62,6 @@ export type ProjectCreateArgs = { } export type CreateProject = (params: ProjectCreateArgs) => Promise - export type StoreProject = (params: { project: Project }) => Promise export type StoreModel = (params: { diff --git a/packages/server/modules/core/domain/streams/operations.ts b/packages/server/modules/core/domain/streams/operations.ts index eb92b5cc8..65f7baaff 100644 --- a/packages/server/modules/core/domain/streams/operations.ts +++ b/packages/server/modules/core/domain/streams/operations.ts @@ -152,8 +152,6 @@ export type CanUserFavoriteStream = (params: { streamId: string }) => Promise -export type DeleteStreamRecord = (streamId: string) => Promise - export type GetOnboardingBaseStream = (version: string) => Promise> export type UpdateStreamRecord = ( @@ -360,10 +358,6 @@ export type UpdateStream = ( updaterId: string ) => Promise -export type LegacyUpdateStream = ( - update: StreamUpdateInput -) => Promise> - export type PermissionUpdateInput = | StreamUpdatePermissionInput | StreamRevokePermissionInput diff --git a/packages/server/modules/core/events/projectListeners.ts b/packages/server/modules/core/events/projectListeners.ts new file mode 100644 index 000000000..0e4aa1320 --- /dev/null +++ b/packages/server/modules/core/events/projectListeners.ts @@ -0,0 +1,36 @@ +import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus' +import { ProjectEvents } from '@/modules/core/domain/projects/events' +import type { Logger } from '@/observability/logging' +import type { DependenciesOf } from '@/modules/shared/helpers/factory' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import { storeModelFactory } from '@/modules/core/repositories/models' + +const onProjectCreatedFactory = + () => + async ({ payload }: EventPayload) => { + const { project, ownerId } = payload + const projectDb = await getProjectDbClient({ projectId: project.id }) + const storeModel = storeModelFactory({ db: projectDb }) + + // Legacy flow for creating a default main branch + await storeModel({ + name: 'main', + description: 'default model', + projectId: project.id, + authorId: ownerId + }) + } + +export const projectListenersFactory = + ( + deps: { eventBus: EventBus; logger: Logger } & DependenciesOf< + typeof onProjectCreatedFactory + > + ) => + () => { + const onProjectCreated = onProjectCreatedFactory() + + const cbs = [deps.eventBus.listen(ProjectEvents.Created, onProjectCreated)] + + return () => cbs.forEach((cb) => cb()) + } diff --git a/packages/server/modules/core/graph/resolvers/projects.ts b/packages/server/modules/core/graph/resolvers/projects.ts index 015321f27..8dccd2252 100644 --- a/packages/server/modules/core/graph/resolvers/projects.ts +++ b/packages/server/modules/core/graph/resolvers/projects.ts @@ -16,21 +16,19 @@ import { toProjectIdWhitelist } from '@/modules/core/helpers/token' import { - createBranchFactory, getBatchedStreamBranchesFactory, insertBranchesFactory } from '@/modules/core/repositories/branches' import { + deleteProjectCommitsFactory, getBatchedBranchCommitsFactory, getBatchedStreamCommitsFactory, insertBranchCommitsFactory, insertCommitsFactory, insertStreamCommitsFactory } from '@/modules/core/repositories/commits' -import { storeModelFactory } from '@/modules/core/repositories/models' import { deleteProjectFactory, - getProjectFactory, storeProjectFactory, storeProjectRoleFactory } from '@/modules/core/repositories/projects' @@ -38,7 +36,6 @@ import { getServerInfoFactory } from '@/modules/core/repositories/server' import { getStreamFactory, createStreamFactory, - deleteStreamFactory, updateStreamFactory, revokeStreamPermissionsFactory, grantStreamPermissionsFactory, @@ -50,7 +47,7 @@ import { import { getUserFactory, getUsersFactory } from '@/modules/core/repositories/users' import { createNewProjectFactory, - waitForRegionProjectFactory + deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' import { throwIfRateLimitedFactory } from '@/modules/core/utils/ratelimiter' import { @@ -69,8 +66,9 @@ import { import { createOnboardingStreamFactory } from '@/modules/core/services/streams/onboarding' import { getOnboardingBaseProjectFactory } from '@/modules/cross-server-sync/services/onboardingProject' import { - getDb, getProjectDbClient, + getProjectReplicationDbs, + getReplicationDbs, getValidDefaultProjectRegionKey } from '@/modules/multiregion/utils/dbSelector' import { @@ -119,8 +117,9 @@ import { sendEmail } from '@/modules/emails/services/sending' import { ProjectRecordVisibility } from '@/modules/core/helpers/types' import { mapDbToGqlProjectVisibility } from '@/modules/core/helpers/project' import { StreamNotFoundError } from '@/modules/core/errors/stream' -import { asOperation } from '@/modules/shared/command' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' import type { Knex } from 'knex' +import type { Logger } from '@/observability/logging' const getUser = getUserFactory({ db }) const getStream = getStreamFactory({ db }) @@ -200,6 +199,32 @@ const throwIfRateLimited = throwIfRateLimitedFactory({ rateLimiterEnabled: isRateLimiterEnabled() }) +const deleteStreamAndNotify = async ( + projectId: string, + userId: string, + ctxLogger: Logger +) => + asMultiregionalOperation( + ({ allDbs, mainDb, emit }) => { + const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) + }), + emitEvent: emit, + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }), + getStream: getStreamFactory({ db: mainDb }) + }) + return deleteStreamAndNotify(projectId, userId) + }, + { + logger: ctxLogger, + name: 'delete project', + description: `Cascade deleting a project`, + dbs: await getProjectReplicationDbs({ projectId }) + } + ) + const resolvers: Resolvers = { Query: { async project(_parent, args, context) { @@ -268,27 +293,8 @@ const resolvers: Resolvers = { }) ) - const results = await withOperationLogging( - async () => - await Promise.all( - args.ids.map(async (id) => { - const projectDb = await getProjectDbClient({ projectId: id }) - const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ - deleteStream: deleteStreamFactory({ - db: projectDb - }), - emitEvent: getEventBus().emit, - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - getStream: getStreamFactory({ db: projectDb }) - }) - return deleteStreamAndNotify(id, ctx.userId!) - }) - ), - { - logger: ctx.log, - operationName: 'projectBatchDelete', - operationDescription: `Delete multiple projects` - } + const results = await Promise.all( + args.ids.map((id) => deleteStreamAndNotify(id, ctx.userId!, ctx.log)) ) return results.every((res) => res === true) }, @@ -314,27 +320,11 @@ const resolvers: Resolvers = { }) throwIfAuthNotOk(canDelete) - const projectDb = await getProjectDbClient({ projectId }) - const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ - deleteStream: deleteStreamFactory({ - db: projectDb - }), - emitEvent: getEventBus().emit, - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - getStream: getStreamFactory({ db: projectDb }) - }) - return await withOperationLogging( - async () => await deleteStreamAndNotify(projectId, userId!), - { - logger, - operationName: 'projectDelete', - operationDescription: `Delete a project` - } - ) + return deleteStreamAndNotify(projectId, userId!, logger) }, async createForOnboarding(_parent, _args, { userId, resourceAccessRules, log }) { - return await asOperation( - async ({ db: mainDb, emit }) => { + return await asMultiregionalOperation( + async ({ mainDb, emit, allDbs }) => { // We want to read & write from main DB - this isn't occurring in a multi region workspace ctx const createOnboardingStream = createOnboardingStreamFactory({ getOnboardingBaseProject: getOnboardingBaseProjectFactory({ @@ -345,7 +335,7 @@ const resolvers: Resolvers = { getUser: getUserFactory({ db: mainDb }), newProjectDb: mainDb, sourceProjectDb: mainDb, - createStream: createStreamFactory({ db: mainDb }), + createStream: replicateFactory(allDbs, createStreamFactory), insertCommits: insertCommitsFactory({ db: mainDb }), getBatchedStreamCommits: getBatchedStreamCommitsFactory({ db: mainDb }), insertStreamCommits: insertStreamCommitsFactory({ db: mainDb }), @@ -381,13 +371,12 @@ const resolvers: Resolvers = { }), getUsers: getUsersFactory({ db: mainDb }) }), - createStream: createStreamFactory({ db: mainDb }), - createBranch: createBranchFactory({ db: mainDb }), + createStream: replicateFactory(allDbs, createStreamFactory), storeProjectRole: storeProjectRoleFactory({ db: mainDb }), emitEvent: emit }), getUser: getUserFactory({ db: mainDb }), - updateStream: updateStreamFactory({ db: mainDb }) + updateStream: replicateFactory(allDbs, updateStreamFactory) }) return await createOnboardingStream({ @@ -399,6 +388,7 @@ const resolvers: Resolvers = { { logger: log, name: 'createOnboardingProject', + dbs: [db], // Cloning does not support multiregion description: `Create a project for onboarding` } ) @@ -426,18 +416,20 @@ const resolvers: Resolvers = { }) throwIfAuthNotOk(canUpdate) - const projectDB = await getProjectDbClient({ projectId }) - const updateStreamAndNotify = updateStreamAndNotifyFactory({ - getStream: getStreamFactory({ db: projectDB }), - updateStream: updateStreamFactory({ db: projectDB }), - emitEvent: getEventBus().emit - }) - const res = await withOperationLogging( - async () => await updateStreamAndNotify(update, userId!), + const res = await asMultiregionalOperation( + async ({ mainDb, allDbs, emit }) => { + const updateStreamAndNotify = updateStreamAndNotifyFactory({ + getStream: getStreamFactory({ db: mainDb }), + updateStream: replicateFactory(allDbs, updateStreamFactory), + emitEvent: emit + }) + + return await updateStreamAndNotify(update, userId!) + }, { logger, - operationName: 'projectUpdate', - operationDescription: `Update a project` + name: 'Update Project', + dbs: await getProjectReplicationDbs({ projectId }) } ) @@ -465,31 +457,25 @@ const resolvers: Resolvers = { throwIfAuthNotOk(canCreate) const regionKey = await getValidDefaultProjectRegionKey() - const projectDb = await getDb({ regionKey }) + const project = await asMultiregionalOperation( + async ({ allDbs, mainDb, emit }) => { + const createNewProject = createNewProjectFactory({ + storeProject: replicateFactory(allDbs, storeProjectFactory), + storeProjectRole: storeProjectRoleFactory({ db: mainDb }), + emitEvent: emit + }) - const createNewProject = createNewProjectFactory({ - storeProject: storeProjectFactory({ db: projectDb }), - storeModel: storeModelFactory({ db: projectDb }), - // THIS MUST GO TO THE MAIN DB - storeProjectRole: storeProjectRoleFactory({ db }), - waitForRegionProject: waitForRegionProjectFactory({ - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db: projectDb }) - }), - emitEvent: getEventBus().emit - }) - - const project = await withOperationLogging( - async () => - await createNewProject({ + return createNewProject({ ...(args.input || {}), ownerId: context.userId!, regionKey - }), + }) + }, { logger, - operationName: 'projectCreate', - operationDescription: `Create a new project` + name: 'projectCreate', + dbs: await getReplicationDbs({ regionKey }), + description: `Create a new project` } ) diff --git a/packages/server/modules/core/graph/resolvers/streams.ts b/packages/server/modules/core/graph/resolvers/streams.ts index 3da4dad68..25a3eca6f 100644 --- a/packages/server/modules/core/graph/resolvers/streams.ts +++ b/packages/server/modules/core/graph/resolvers/streams.ts @@ -13,7 +13,6 @@ import { get } from 'lodash-es' import { getStreamFactory, createStreamFactory, - deleteStreamFactory, updateStreamFactory, revokeStreamPermissionsFactory, grantStreamPermissionsFactory, @@ -64,7 +63,6 @@ import { createAndSendInviteFactory } from '@/modules/serverinvites/services/cre import { collectAndValidateCoreTargetsFactory } from '@/modules/serverinvites/services/coreResourceCollection' import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/services/coreEmailContents' import { getEventBus } from '@/modules/shared/services/eventBus' -import { createBranchFactory } from '@/modules/core/repositories/branches' import { addOrUpdateStreamCollaboratorFactory, isStreamCollaboratorFactory, @@ -103,7 +101,15 @@ import { renderEmail } from '@/modules/emails/services/emailRendering' import { sendEmail } from '@/modules/emails/services/sending' import { ProjectRecordVisibility } from '@/modules/core/helpers/types' import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper' -import { storeProjectRoleFactory } from '@/modules/core/repositories/projects' +import { + deleteProjectFactory, + storeProjectRoleFactory +} from '@/modules/core/repositories/projects' +import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector' +import type { Logger } from '@/observability/logging' const getServerInfo = getServerInfoFactory({ db }) const getUsers = getUsersFactory({ db }) @@ -160,44 +166,32 @@ const buildFinalizeProjectInvite = () => getServerInfo }) -const createStreamReturnRecord = createStreamReturnRecordFactory({ - inviteUsersToProject: inviteUsersToProjectFactory({ - createAndSendInvite: createAndSendInviteFactory({ - findUserByTarget: findUserByTargetFactory({ db }), - insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db }), - collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({ - getStream - }), - buildInviteEmailContents: buildCoreInviteEmailContentsFactory({ - getStream - }), - emitEvent: ({ eventName, payload }) => - getEventBus().emit({ - eventName, - payload +const deleteStreamAndNotify = async ( + projectId: string, + userId: string, + ctxLogger: Logger +) => + asMultiregionalOperation( + ({ allDbs, mainDb, emit }) => { + const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) }), - getUser, - getServerInfo, - finalizeInvite: buildFinalizeProjectInvite() - }), - getUsers - }), - createStream: createStreamFactory({ db }), - createBranch: createBranchFactory({ db }), - storeProjectRole: storeProjectRoleFactory({ db }), - emitEvent: getEventBus().emit -}) -const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ - deleteStream: deleteStreamFactory({ db }), - emitEvent: getEventBus().emit, - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - getStream -}) -const updateStreamAndNotify = updateStreamAndNotifyFactory({ - getStream, - updateStream: updateStreamFactory({ db }), - emitEvent: getEventBus().emit -}) + emitEvent: emit, + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }), + getStream: getStreamFactory({ db: mainDb }) + }) + return deleteStreamAndNotify(projectId, userId) + }, + { + logger: ctxLogger, + name: 'delete project', + description: `Cascade deleting a project`, + dbs: await getProjectReplicationDbs({ projectId }) + } + ) + const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver }) const isStreamCollaborator = isStreamCollaboratorFactory({ getStream @@ -513,17 +507,43 @@ export default { }) throwIfAuthNotOk(canCreate) - const { id } = await withOperationLogging( - async () => - await createStreamReturnRecord({ + const { id } = await asMultiregionalOperation( + async ({ allDbs, mainDb, emit }) => + createStreamReturnRecordFactory({ + inviteUsersToProject: inviteUsersToProjectFactory({ + createAndSendInvite: createAndSendInviteFactory({ + findUserByTarget: findUserByTargetFactory({ db: mainDb }), + insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ + db: mainDb + }), + collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory( + { + getStream: getStreamFactory({ db: mainDb }) + } + ), + buildInviteEmailContents: buildCoreInviteEmailContentsFactory({ + getStream: getStreamFactory({ db: mainDb }) + }), + emitEvent: emit, + getUser: getUserFactory({ db: mainDb }), + getServerInfo: getServerInfoFactory({ db: mainDb }), + finalizeInvite: buildFinalizeProjectInvite() + }), + getUsers: getUsersFactory({ db: mainDb }) + }), + createStream: replicateFactory(allDbs, createStreamFactory), + storeProjectRole: storeProjectRoleFactory({ db: mainDb }), + emitEvent: emit + })({ ...args.stream, ownerId: context.userId!, ownerResourceAccessRules: context.resourceAccessRules }), { logger: context.log, - operationName: 'createStream', - operationDescription: `Create a new Stream` + name: 'createStream', + description: `Create a new Stream`, + dbs: [db] // legacy; no multiregion ctx } ) @@ -549,12 +569,20 @@ export default { streamId: projectId //legacy }) - await withOperationLogging( - async () => await updateStreamAndNotify(args.stream, context.userId!), + await asMultiregionalOperation( + async ({ mainDb, allDbs, emit }) => { + const updateStreamAndNotify = updateStreamAndNotifyFactory({ + getStream: getStreamFactory({ db: mainDb }), + updateStream: replicateFactory(allDbs, updateStreamFactory), + emitEvent: emit + }) + + await updateStreamAndNotify(args.stream, context.userId!) + }, { logger, - operationName: 'updateStream', - operationDescription: `Update a Stream` + name: 'updateStream', + dbs: await getProjectReplicationDbs({ projectId }) } ) return true @@ -579,43 +607,29 @@ export default { streamId: projectId //legacy }) - return await withOperationLogging( - async () => await deleteStreamAndNotify(args.id, context.userId!), - { - logger, - operationName: 'deleteStream', - operationDescription: `Delete a Stream` - } - ) + return await deleteStreamAndNotify(args.id, context.userId!, logger) }, async streamsDelete(_, args, context) { const logger = context.log - const results = await withOperationLogging( - async () => - await Promise.all( - (args.ids || []).map(async (id) => { - throwIfResourceAccessNotAllowed({ - resourceId: id, - resourceType: TokenResourceIdentifierType.Project, - resourceAccessRules: context.resourceAccessRules - }) - const canDelete = await context.authPolicies.project.canDelete({ - userId: context.userId!, - projectId: id - }) - throwIfAuthNotOk(canDelete) + const results = await Promise.all( + (args.ids || []).map(async (id) => { + throwIfResourceAccessNotAllowed({ + resourceId: id, + resourceType: TokenResourceIdentifierType.Project, + resourceAccessRules: context.resourceAccessRules + }) + const canDelete = await context.authPolicies.project.canDelete({ + userId: context.userId!, + projectId: id + }) + throwIfAuthNotOk(canDelete) - return await deleteStreamAndNotify(id, context.userId!) - }) - ), - { - logger, - operationName: 'deleteStreams', - operationDescription: `Delete one or more Streams` - } + return await deleteStreamAndNotify(id, context.userId!, logger) + }) ) + return results.every((res) => res === true) }, diff --git a/packages/server/modules/core/graph/resolvers/users.ts b/packages/server/modules/core/graph/resolvers/users.ts index be137e83c..b1591b0b5 100644 --- a/packages/server/modules/core/graph/resolvers/users.ts +++ b/packages/server/modules/core/graph/resolvers/users.ts @@ -32,7 +32,6 @@ import { changeUserRoleFactory } from '@/modules/core/services/users/management' import { - deleteStreamFactory, getExplicitProjects, getUserDeletableStreamsFactory } from '@/modules/core/repositories/streams' @@ -47,13 +46,22 @@ import { import { updateMailchimpMemberTags } from '@/modules/auth/services/mailchimp' import { withOperationLogging } from '@/observability/domain/businessLogging' import { metaHelpers } from '@/modules/core/helpers/meta' -import { asMultiregionalOperation, asOperation } from '@/modules/shared/command' +import { + asMultiregionalOperation, + asOperation, + replicateFactory +} from '@/modules/shared/command' import { setUserOnboardingChoicesFactory } from '@/modules/core/services/users/tracking' import { getMixpanelClient } from '@/modules/shared/utils/mixpanel' import { throwIfAuthNotOk } from '@/modules/shared/helpers/errorHelper' import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' const getUser = legacyGetUserFactory({ db }) const getUserByEmail = legacyGetUserByEmailFactory({ db }) @@ -309,7 +317,16 @@ export default { await asMultiregionalOperation( ({ mainDb, allDbs, emit }) => { const deleteUser = deleteUserFactory({ - deleteStream: deleteStreamFactory({ db: mainDb }), + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory( + allDbs, + deleteProjectCommitsFactory + ) + }), logger: dbLogger, isLastAdminUser: isLastAdminUserFactory({ db: mainDb }), getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }), @@ -359,7 +376,16 @@ export default { await asMultiregionalOperation( ({ mainDb, allDbs, emit }) => { const deleteUser = deleteUserFactory({ - deleteStream: deleteStreamFactory({ db: mainDb }), + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory( + allDbs, + deleteProjectCommitsFactory + ) + }), logger: dbLogger, isLastAdminUser: isLastAdminUserFactory({ db: mainDb }), getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }), diff --git a/packages/server/modules/core/index.ts b/packages/server/modules/core/index.ts index 5b4e7dc50..d85841aa4 100644 --- a/packages/server/modules/core/index.ts +++ b/packages/server/modules/core/index.ts @@ -45,6 +45,7 @@ import { import { getServerTotalModelCountFactory } from '@/modules/core/services/branch/retrieval' import { getServerTotalVersionCountFactory } from '@/modules/core/services/commit/retrieval' import { bullMonitoringRouterFactory } from '@/modules/core/rest/monitoring' +import { projectListenersFactory } from '@/modules/core/events/projectListeners' let stopTestSubs: (() => void) | undefined = undefined @@ -121,6 +122,11 @@ const coreModule: SpeckleModule<{ logger: coreLogger }) })() + + projectListenersFactory({ + eventBus: getEventBus(), + logger: coreLogger + })() } }, async finalize({ app }) { diff --git a/packages/server/modules/core/repositories/commits.ts b/packages/server/modules/core/repositories/commits.ts index 1a0363f03..2a07e693f 100644 --- a/packages/server/modules/core/repositories/commits.ts +++ b/packages/server/modules/core/repositories/commits.ts @@ -58,7 +58,8 @@ import type { LegacyGetPaginatedUserCommitsPage, LegacyGetPaginatedUserCommitsTotalCount, LegacyGetPaginatedStreamCommitsPage, - GetTotalVersionCount + GetTotalVersionCount, + DeleteProjectCommits } from '@/modules/core/domain/commits/operations' const tables = { @@ -156,6 +157,21 @@ export const deleteCommitFactory = return !!delCount } +export const deleteProjectCommitsFactory = + (deps: { db: Knex }): DeleteProjectCommits => + async ({ projectId }) => { + await deps.db.raw( + ` + DELETE FROM commits WHERE id IN ( + SELECT sc."commitId" FROM streams s + INNER JOIN stream_commits sc ON s.id = sc."streamId" + WHERE s.id = ? + ) + `, + [projectId] + ) + } + export const getBatchedStreamCommitsFactory = (deps: { db: Knex }): GetBatchedStreamCommits => (streamId: string, options?: Partial) => { diff --git a/packages/server/modules/core/repositories/streams.ts b/packages/server/modules/core/repositories/streams.ts index c5d7b9edf..c1d7661e9 100644 --- a/packages/server/modules/core/repositories/streams.ts +++ b/packages/server/modules/core/repositories/streams.ts @@ -79,7 +79,6 @@ import type { GetStream, GetStreamCollaborators, GetStreams, - DeleteStreamRecord, UpdateStreamRecord, RevokeStreamPermissions, GrantStreamPermissions, @@ -890,6 +889,7 @@ export const getUserStreamsCountFactory = const [res] = await countQuery return parseInt(res.count) } + export const createStreamFactory = (deps: { db: Knex }): SaveStream => async (input) => { @@ -961,23 +961,6 @@ export const getUserStreamCountsFactory = return mapValues(keyBy(results, 'userId'), (r) => parseInt(r.count)) } -export const deleteStreamFactory = - (deps: { db: Knex }): DeleteStreamRecord => - async (streamId: string) => { - // Delete stream commits (not automatically cascaded) - await deps.db.raw( - ` - DELETE FROM commits WHERE id IN ( - SELECT sc."commitId" FROM streams s - INNER JOIN stream_commits sc ON s.id = sc."streamId" - WHERE s.id = ? - ) - `, - [streamId] - ) - return await tables.streams(deps.db).where(Streams.col.id, streamId).del() - } - export const getStreamsSourceAppsFactory = (deps: { db: Knex }): GetStreamsSourceApps => async (streamIds: string[]) => { @@ -1295,6 +1278,7 @@ export const markOnboardingBaseStreamFactory = if (!stream) { throw new StreamNotFoundError(`Stream ${streamId} not found`) } + // this happens outside of the a multiregion ctx await updateStreamFactory(deps)({ id: streamId, name: 'Onboarding Stream Local Source - Do Not Delete' diff --git a/packages/server/modules/core/services/projects.ts b/packages/server/modules/core/services/projects.ts index 4405424ff..1f9e7e3ef 100644 --- a/packages/server/modules/core/services/projects.ts +++ b/packages/server/modules/core/services/projects.ts @@ -3,43 +3,32 @@ import { generateProjectName } from '@/modules/core/domain/projects/logic' import type { CreateProject, DeleteProject, - GetProject, + DeleteProjectAndCommits, QueryAllProjects, - StoreModel, StoreProject, - StoreProjectRole, - WaitForRegionProject + StoreProjectRole } from '@/modules/core/domain/projects/operations' import type { Project, StreamWithOptionalRole } from '@/modules/core/domain/streams/types' -import { - ProjectQueryError, - RegionalProjectCreationError -} from '@/modules/core/errors/projects' -import { StreamNotFoundError } from '@/modules/core/errors/stream' +import { ProjectQueryError } from '@/modules/core/errors/projects' import { ProjectVisibility } from '@/modules/core/graph/generated/graphql' import { mapGqlToDbProjectVisibility } from '@/modules/core/helpers/project' -import { isTestEnv } from '@/modules/shared/helpers/envHelper' import type { EventBusEmit } from '@/modules/shared/services/eventBus' -import { retry } from '@lifeomic/attempt' -import { Roles, TIME_MS } from '@speckle/shared' +import { Roles } from '@speckle/shared' import cryptoRandomString from 'crypto-random-string' import type { GetExplicitProjects } from '@/modules/core/domain/streams/operations' +import type { DeleteProjectCommits } from '@/modules/core/domain/commits/operations' export const createNewProjectFactory = ({ storeProject, storeProjectRole, - storeModel, - waitForRegionProject, emitEvent }: { storeProject: StoreProject storeProjectRole: StoreProjectRole - storeModel: StoreModel - waitForRegionProject: WaitForRegionProject emitEvent: EventBusEmit }): CreateProject => async ({ description, name, regionKey, visibility, workspaceId, ownerId }) => { @@ -62,20 +51,8 @@ export const createNewProjectFactory = await storeProject({ project }) const projectId = project.id - // if regionKey, we need to make sure it is actually written and synced - if (regionKey) { - await waitForRegionProject({ - projectId, - regionKey - }) - } + await storeProjectRole({ projectId, userId: ownerId, role: Roles.Stream.Owner }) - await storeModel({ - name: 'main', - description: 'default model', - projectId, - authorId: ownerId - }) await emitEvent({ eventName: ProjectEvents.Created, @@ -103,33 +80,6 @@ export const createNewProjectFactory = return project } -export const waitForRegionProjectFactory = - (deps: { - getProject: GetProject - deleteProject: DeleteProject - }): WaitForRegionProject => - async ({ projectId, regionKey, maxAttempts = 10 }) => { - try { - await retry( - async () => { - const replicatedProject = await deps.getProject({ projectId }) - if (!replicatedProject) throw new StreamNotFoundError() - }, - { maxAttempts, delay: isTestEnv() ? TIME_MS.second : undefined } - ) - } catch (err) { - if (err instanceof StreamNotFoundError) { - // delete from region - await deps.deleteProject({ projectId }) - throw new RegionalProjectCreationError(undefined, { - info: { projectId, regionKey } - }) - } - // else throw as is - throw err - } - } - export const queryAllProjectsFactory = ({ getExplicitProjects }: { @@ -163,3 +113,13 @@ export const queryAllProjectsFactory = ({ iterationCount++ } while (!!currentCursor) } + +export const deleteProjectAndCommitsFactory = + (deps: { + deleteProject: DeleteProject + deleteProjectCommits: DeleteProjectCommits + }): DeleteProjectAndCommits => + async (project) => { + await deps.deleteProjectCommits(project) + await deps.deleteProject(project) + } diff --git a/packages/server/modules/core/services/streams/management.ts b/packages/server/modules/core/services/streams/management.ts index 2a9d82e38..beb45fb53 100644 --- a/packages/server/modules/core/services/streams/management.ts +++ b/packages/server/modules/core/services/streams/management.ts @@ -23,11 +23,9 @@ import type { AddOrUpdateStreamCollaborator, CreateStream, DeleteStream, - DeleteStreamRecord, GetStream, IsStreamCollaborator, LegacyCreateStream, - LegacyUpdateStream, PermissionUpdateInput, RemoveStreamCollaborator, SaveStream, @@ -35,11 +33,13 @@ import type { UpdateStreamRecord, UpdateStreamRole } from '@/modules/core/domain/streams/operations' -import type { StoreBranch } from '@/modules/core/domain/branches/operations' import type { DeleteAllResourceInvites } from '@/modules/serverinvites/domain/operations' import type { EventBusEmit } from '@/modules/shared/services/eventBus' import { ProjectEvents } from '@/modules/core/domain/projects/events' -import type { StoreProjectRole } from '@/modules/core/domain/projects/operations' +import type { + DeleteProjectAndCommits, + StoreProjectRole +} from '@/modules/core/domain/projects/operations' import { generateProjectName } from '@/modules/core/domain/projects/logic' import cryptoRandomString from 'crypto-random-string' @@ -47,7 +47,6 @@ export const createStreamReturnRecordFactory = (deps: { createStream: SaveStream storeProjectRole: StoreProjectRole - createBranch: StoreBranch inviteUsersToProject: ReturnType emitEvent: EventBusEmit }): CreateStream => @@ -87,14 +86,6 @@ export const createStreamReturnRecordFactory = }) } - // Create a default main branch - await deps.createBranch({ - name: 'main', - description: 'default branch', - streamId, - authorId: ownerId - }) - // Invite contributors? if (!isProjectCreateInput(params) && params.withContributors?.length) { // TODO: should be injected in the resolver @@ -144,7 +135,7 @@ export const legacyCreateStreamFactory = */ export const deleteStreamAndNotifyFactory = (deps: { - deleteStream: DeleteStreamRecord + deleteProjectAndCommits: DeleteProjectAndCommits deleteAllResourceInvites: DeleteAllResourceInvites getStream: GetStream emitEvent: EventBusEmit @@ -178,7 +169,7 @@ export const deleteStreamAndNotifyFactory = resourceId: streamId, resourceType: ProjectInviteResourceType }), - deps.deleteStream(streamId) + deps.deleteProjectAndCommits({ projectId: streamId }) ]) return true } @@ -218,16 +209,6 @@ export const updateStreamAndNotifyFactory = return newStream } -/** - * @deprecated Use updateStreamAndNotifyFactory() or the repo fn directly - */ -export const legacyUpdateStreamFactory = - (deps: { updateStream: UpdateStreamRecord }): LegacyUpdateStream => - async (update) => { - const updatedStream = await deps.updateStream(update) - return updatedStream?.id || null - } - const isProjectUpdateRoleInput = ( i: PermissionUpdateInput ): i is ProjectUpdateRoleInput => has(i, 'projectId') diff --git a/packages/server/modules/core/services/users/management.ts b/packages/server/modules/core/services/users/management.ts index 558d52f75..9b8a9d362 100644 --- a/packages/server/modules/core/services/users/management.ts +++ b/packages/server/modules/core/services/users/management.ts @@ -39,10 +39,7 @@ import type { FindPrimaryEmailForUser, ValidateAndCreateUserEmail } from '@/modules/core/domain/userEmails/operations' -import type { - DeleteStreamRecord, - GetUserDeletableStreams -} from '@/modules/core/domain/streams/operations' +import type { GetUserDeletableStreams } from '@/modules/core/domain/streams/operations' import type { Logger } from '@/observability/logging' import type { DeleteAllUserInvites } from '@/modules/serverinvites/domain/operations' import type { GetServerInfo } from '@/modules/core/domain/server/operations' @@ -52,7 +49,10 @@ import { getFeatureFlags } from '@/modules/shared/helpers/envHelper' import type { GetUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/domain/operations' import { WorkspaceEvents } from '@/modules/workspacesCore/domain/events' import { ProjectEvents } from '@/modules/core/domain/projects/events' -import type { QueryAllProjects } from '@/modules/core/domain/projects/operations' +import type { + DeleteProjectAndCommits, + QueryAllProjects +} from '@/modules/core/domain/projects/operations' import type { StreamWithOptionalRole } from '@/modules/core/repositories/streams' import { v4 } from 'uuid' @@ -293,7 +293,7 @@ export const findOrCreateUserFactory = export const deleteUserFactory = (deps: { - deleteStream: DeleteStreamRecord + deleteProjectAndCommits: DeleteProjectAndCommits logger: Logger isLastAdminUser: IsLastAdminUser getUserDeletableStreams: GetUserDeletableStreams @@ -312,7 +312,7 @@ export const deleteUserFactory = const streamIds = await deps.getUserDeletableStreams(id) for (const id of streamIds) { - await deps.deleteStream(id) + await deps.deleteProjectAndCommits({ projectId: id }) } // Delete all invites (they don't have a FK, so we need to do this manually) diff --git a/packages/server/modules/core/tests/helpers/creation.ts b/packages/server/modules/core/tests/helpers/creation.ts index ab54c03d3..447000fa0 100644 --- a/packages/server/modules/core/tests/helpers/creation.ts +++ b/packages/server/modules/core/tests/helpers/creation.ts @@ -17,7 +17,7 @@ export const buildTestProject = (overrides?: Partial): Project => updatedAt: new Date(), allowPublicComments: false, workspaceId: cryptoRandomString({ length: 10 }), - regionKey: cryptoRandomString({ length: 4 }), + regionKey: null, visibility: ProjectRecordVisibility.Private }, overrides diff --git a/packages/server/modules/core/tests/integration/projectRepositories.spec.ts b/packages/server/modules/core/tests/integration/projectRepositories.spec.ts index 05aa51fea..fd4e1b486 100644 --- a/packages/server/modules/core/tests/integration/projectRepositories.spec.ts +++ b/packages/server/modules/core/tests/integration/projectRepositories.spec.ts @@ -14,6 +14,10 @@ import { Roles } from '@speckle/shared' import { expect } from 'chai' import cryptoRandomString from 'crypto-random-string' import { assign } from 'lodash-es' +import type { DeleteProject } from '@/modules/core/domain/projects/operations' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' +import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector' const createTestProject = (overrides?: Partial): Project => { const defaults: Project = { @@ -33,7 +37,16 @@ const createTestProject = (overrides?: Partial): Project => { const storeProject = storeProjectFactory({ db }) const getProject = getProjectFactory({ db }) -const deleteProject = deleteProjectFactory({ db }) +const deleteProject: DeleteProject = async ({ projectId }) => + asMultiregionalOperation( + async ({ allDbs }) => + await replicateFactory(allDbs, deleteProjectFactory)({ projectId }), + { + name: 'delete spec', + logger, + dbs: await getProjectReplicationDbs({ projectId }) + } + ) const storeProjectRole = storeProjectRoleFactory({ db }) describe('project repositories @core', () => { @@ -77,9 +90,6 @@ describe('project repositories @core', () => { }) }) describe('deleteProjectFactory creates a function, that', () => { - it('does nothing if project does not exist', async () => { - await deleteProject({ projectId: cryptoRandomString({ length: 10 }) }) - }) it('deletes the project', async () => { const project = createTestProject() await storeProject({ project }) diff --git a/packages/server/modules/core/tests/integration/subs.graph.spec.ts b/packages/server/modules/core/tests/integration/subs.graph.spec.ts index 25bab20c3..c90c723d3 100644 --- a/packages/server/modules/core/tests/integration/subs.graph.spec.ts +++ b/packages/server/modules/core/tests/integration/subs.graph.spec.ts @@ -10,6 +10,7 @@ import { } from '@/modules/core/repositories/branches' import { deleteCommitsFactory, + deleteProjectCommitsFactory, getCommitBranchFactory, getCommitFactory, getCommitsFactory, @@ -17,7 +18,6 @@ import { updateCommitFactory } from '@/modules/core/repositories/commits' import { - deleteStreamFactory, getCommitStreamFactory, getStreamFactory, getStreamRolesFactory, @@ -45,7 +45,10 @@ import { deleteStreamAndNotifyFactory, updateStreamAndNotifyFactory } from '@/modules/core/services/streams/management' -import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import { + getProjectDbClient, + getProjectReplicationDbs +} from '@/modules/multiregion/utils/dbSelector' import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites' import { authorizeResolver } from '@/modules/shared' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -97,18 +100,25 @@ import { faker } from '@faker-js/faker' import type { Optional, ServerScope } from '@speckle/shared' import { Roles, Scopes, WorkspacePlans } from '@speckle/shared' import { expect } from 'chai' +import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import type { UpdateStream } from '@/modules/core/domain/streams/operations' +import { logger } from '@/observability/logging' const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver }) const isStreamCollaborator = isStreamCollaboratorFactory({ getStream: getStreamFactory({ db }) }) +// should be wrapped in a multiregion operator const buildDeleteProject = async (params: { projectId: string; ownerId: string }) => { const { projectId, ownerId } = params const projectDb = await getProjectDbClient({ projectId }) const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ - deleteStream: deleteStreamFactory({ - db: projectDb + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: deleteProjectFactory({ db: projectDb }), + deleteProjectCommits: deleteProjectCommitsFactory({ db: projectDb }) }), emitEvent: getEventBus().emit, deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), @@ -117,15 +127,23 @@ const buildDeleteProject = async (params: { projectId: string; ownerId: string } return async () => deleteStreamAndNotify(projectId, ownerId) } -const buildUpdateProject = async (params: { projectId: string }) => { - const { projectId } = params - const projectDB = await getProjectDbClient({ projectId }) - const updateStreamAndNotify = updateStreamAndNotifyFactory({ - getStream: getStreamFactory({ db: projectDB }), - updateStream: updateStreamFactory({ db: projectDB }), - emitEvent: getEventBus().emit - }) - return updateStreamAndNotify +const updateProject: UpdateStream = async (stream, me) => { + return asMultiregionalOperation( + async ({ mainDb, allDbs, emit }) => { + const updateStreamAndNotify = updateStreamAndNotifyFactory({ + getStream: getStreamFactory({ db: mainDb }), + updateStream: replicateFactory(allDbs, updateStreamFactory), + emitEvent: emit + }) + + return updateStreamAndNotify(stream, me) + }, + { + logger, + name: 'updateStream spec', + dbs: await getProjectReplicationDbs({ projectId: stream.id }) + } + ) } const buildUpdateModel = async (params: { projectId: string }) => { @@ -284,7 +302,6 @@ describe('Core GraphQL Subscriptions (New)', () => { const triggerProjectUpdate = async () => { const projectId = randomProject.id - const updateProject = await buildUpdateProject({ projectId }) await updateProject({ id: projectId, name: new Date().toISOString() }, me.id) } @@ -589,7 +606,6 @@ describe('Core GraphQL Subscriptions (New)', () => { workspaceId: myMainWorkspace.id } await createTestStreams([[myProj, me]]) - const updateProject = await buildUpdateProject({ projectId: myProj.id }) const onUserProjectsUpdated = await meSubClient.subscribe( OnProjectUpdatedDocument, @@ -631,7 +647,6 @@ describe('Core GraphQL Subscriptions (New)', () => { workspaceId: myMainWorkspace.id } await createTestStreams([[myProj, me]]) - const updateProject = await buildUpdateProject({ projectId: myProj.id }) const onUserProjectsUpdated = await meSubClient.subscribe( OnProjectUpdatedDocument, diff --git a/packages/server/modules/core/tests/streams.spec.ts b/packages/server/modules/core/tests/streams.spec.ts index b15f8b0b8..224b307b7 100644 --- a/packages/server/modules/core/tests/streams.spec.ts +++ b/packages/server/modules/core/tests/streams.spec.ts @@ -10,13 +10,9 @@ import { import type { BasicTestUser } from '@/test/authHelper' import { createTestUsers } from '@/test/authHelper' import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper' -import { - createTestStream, - createTestStreams -} from '@/test/speckle-helpers/streamHelper' +import { createTestStream } from '@/test/speckle-helpers/streamHelper' import type { StreamWithOptionalRole } from '@/modules/core/repositories/streams' import { - deleteStreamFactory, getStreamFactory, getStreamRolesFactory, getStreamsCollaboratorsFactory, @@ -55,6 +51,7 @@ import { } from '@/modules/core/services/commit/management' import { createCommitFactory, + deleteProjectCommitsFactory, insertBranchCommitsFactory, insertStreamCommitsFactory } from '@/modules/core/repositories/commits' @@ -82,6 +79,12 @@ import { import { changeUserRoleFactory } from '@/modules/core/services/users/management' import { getServerInfoFactory } from '@/modules/core/repositories/server' import { createObjectFactory } from '@/modules/core/services/objects/management' +import { deleteProjectAndCommitsFactory } from '@/modules/core/services/projects' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' +import { getProjectReplicationDbs } from '@/modules/multiregion/utils/dbSelector' +import type { UpdateStream } from '@/modules/core/domain/streams/operations' const getServerInfo = getServerInfoFactory({ db }) const getUser = getUserFactory({ db }) @@ -115,18 +118,46 @@ const createCommitByBranchName = createCommitByBranchNameFactory({ getStreamBranchByName: getStreamBranchByNameFactory({ db }), getBranchById: getBranchByIdFactory({ db }) }) -const deleteStream = deleteStreamAndNotifyFactory({ - deleteStream: deleteStreamFactory({ db }), - getStream, - emitEvent: getEventBus().emit, - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }) -}) -const updateStream = updateStreamAndNotifyFactory({ - getStream, - updateStream: updateStreamFactory({ db }), - emitEvent: getEventBus().emit -}) +const deleteStream = async (projectId: string, userId: string) => + asMultiregionalOperation( + ({ allDbs, mainDb, emit }) => { + const deleteStreamAndNotify = deleteStreamAndNotifyFactory({ + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) + }), + emitEvent: emit, + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db: mainDb }), + getStream: getStreamFactory({ db: mainDb }) + }) + return deleteStreamAndNotify(projectId, userId) + }, + { + logger, + name: 'delete project spec', + description: `Cascade deleting a project in all regions`, + dbs: await getProjectReplicationDbs({ projectId }) + } + ) + +const updateStream: UpdateStream = async (stream, userId) => + asMultiregionalOperation( + async ({ mainDb, allDbs, emit }) => { + const updateStreamAndNotify = updateStreamAndNotifyFactory({ + getStream: getStreamFactory({ db: mainDb }), + updateStream: replicateFactory(allDbs, updateStreamFactory), + emitEvent: emit + }) + + return updateStreamAndNotify(stream, userId) + }, + { + logger, + name: 'updateStream', + dbs: await getProjectReplicationDbs({ projectId: stream.id }) + } + ) const revokeStreamPermissions = revokeStreamPermissionsFactory({ db }) const validateStreamAccess = validateStreamAccessFactory({ @@ -163,21 +194,8 @@ describe('Streams @core-streams', () => { id: '' } - const testStream: BasicTestStream = { - name: 'Test Stream 01', - description: 'wonderful test stream', - isPublic: true, - ownerId: '', - id: '' - } - - const secondTestStream: BasicTestStream = { - name: 'Test Stream 02', - description: 'wot', - isPublic: false, - ownerId: '', - id: '' - } + let testStream: BasicTestStream + let secondTestStream: BasicTestStream let quitters: (() => void)[] = [] @@ -190,10 +208,22 @@ describe('Streams @core-streams', () => { await beforeEachContext() await createTestUsers([userOne, userTwo]) - await createTestStreams([ - [testStream, userOne], - [secondTestStream, userOne] - ]) + testStream = await createTestStream( + { + name: 'Test Stream 01', + description: 'wonderful test stream', + isPublic: true + }, + userOne + ) + secondTestStream = await createTestStream( + { + name: 'Test Stream 02', + description: 'wot', + isPublic: false + }, + userOne + ) }) afterEach(() => { diff --git a/packages/server/modules/core/tests/unit/projects.spec.ts b/packages/server/modules/core/tests/unit/projects.spec.ts index fccf8a743..dd5ebce68 100644 --- a/packages/server/modules/core/tests/unit/projects.spec.ts +++ b/packages/server/modules/core/tests/unit/projects.spec.ts @@ -1,14 +1,8 @@ import { ProjectEvents } from '@/modules/core/domain/projects/events' import type { Project } from '@/modules/core/domain/streams/types' -import { RegionalProjectCreationError } from '@/modules/core/errors/projects' -import { StreamNotFoundError } from '@/modules/core/errors/stream' import { ProjectRecordVisibility } from '@/modules/core/helpers/types' -import { - createNewProjectFactory, - waitForRegionProjectFactory -} from '@/modules/core/services/projects' +import { createNewProjectFactory } from '@/modules/core/services/projects' import { isSpecificEventPayload } from '@/modules/shared/services/eventBus' -import { expectToThrow } from '@/test/assertionHelper' import type { StreamRoles } from '@speckle/shared' import { Roles } from '@speckle/shared' import { expect } from 'chai' @@ -24,10 +18,6 @@ describe('project services @core', () => { storedProject = project }, storeProjectRole: async () => {}, - storeModel: async () => {}, - waitForRegionProject: async () => { - expect.fail() - }, emitEvent: async () => {} }) const project = await createNewProject({ ownerId }) @@ -47,10 +37,6 @@ describe('project services @core', () => { storedProject = project }, storeProjectRole: async () => {}, - storeModel: async () => {}, - waitForRegionProject: async () => { - expect.fail() - }, emitEvent: async () => {} }) @@ -71,10 +57,6 @@ describe('project services @core', () => { storedProject = project }, storeProjectRole: async () => {}, - storeModel: async () => {}, - waitForRegionProject: async () => { - expect.fail() - }, emitEvent: async () => {} }) @@ -93,10 +75,6 @@ describe('project services @core', () => { storedProject = project }, storeProjectRole: async () => {}, - storeModel: async () => {}, - waitForRegionProject: async () => { - expect.fail() - }, emitEvent: async () => {} }) const project = await createNewProject({ ownerId, visibility: 'PRIVATE' }) @@ -105,72 +83,6 @@ describe('project services @core', () => { expect(storedProject!.visibility).to.eq(ProjectRecordVisibility.Private) expect(storedProject!.allowPublicComments).to.be.false }) - it('continues if the project is eventually synced', async () => { - const ownerId = cryptoRandomString({ length: 10 }) - - let queriedProjectId: string | undefined = undefined - let storedProject: Project | undefined = undefined - let storedProjectRole: - | { - projectId: string - userId: string - role: StreamRoles - } - | undefined = undefined - let storedModel: - | { - name: string - description: string | null - projectId: string - authorId: string - } - | undefined = undefined - let emitedEvent: string | undefined = undefined - let eventPayload: { project: Project; ownerId: string } | undefined = undefined - const createNewProject = createNewProjectFactory({ - storeProject: async ({ project }) => { - storedProject = project - }, - storeProjectRole: async (args) => { - storedProjectRole = args - }, - storeModel: async (args) => { - storedModel = args - }, - waitForRegionProject: async ({ projectId }) => { - queriedProjectId = projectId - }, - emitEvent: async (payload) => { - if (isSpecificEventPayload(payload, ProjectEvents.Created)) { - emitedEvent = payload.eventName - eventPayload = payload.payload - } - } - }) - const project = await createNewProject({ - ownerId, - regionKey: cryptoRandomString({ length: 10 }) - }) - expect(storedProject!.id).to.equal(queriedProjectId) - expect(project).deep.equal(storedProject) - expect(storedProjectRole).deep.equal({ - projectId: project.id, - userId: ownerId, - role: Roles.Stream.Owner - }) - expect(storedModel).deep.equal({ - name: 'main', - description: 'default model', - projectId: project.id, - authorId: ownerId - }) - expect(emitedEvent).to.equal(ProjectEvents.Created) - expect(eventPayload).deep.equal({ - ownerId, - project, - input: { description: '', name: project.name, visibility: 'PRIVATE' } - }) - }) it('successfully creates a project', async () => { const ownerId = cryptoRandomString({ length: 10 }) @@ -182,14 +94,6 @@ describe('project services @core', () => { role: StreamRoles } | undefined = undefined - let storedModel: - | { - name: string - description: string | null - projectId: string - authorId: string - } - | undefined = undefined let emitedEvent: string | undefined = undefined let eventPayload: { project: Project; ownerId: string } | undefined = undefined const createNewProject = createNewProjectFactory({ @@ -199,12 +103,6 @@ describe('project services @core', () => { storeProjectRole: async (args) => { storedProjectRole = args }, - storeModel: async (args) => { - storedModel = args - }, - waitForRegionProject: async () => { - expect.fail() - }, emitEvent: async (payload) => { if (isSpecificEventPayload(payload, ProjectEvents.Created)) { emitedEvent = payload.eventName @@ -219,12 +117,6 @@ describe('project services @core', () => { userId: ownerId, role: Roles.Stream.Owner }) - expect(storedModel).deep.equal({ - name: 'main', - description: 'default model', - projectId: project.id, - authorId: ownerId - }) expect(emitedEvent).to.equal(ProjectEvents.Created) expect(eventPayload).deep.equal({ ownerId, @@ -233,49 +125,4 @@ describe('project services @core', () => { }) }) }) - describe('waitForRegionProject creates a function, that', () => { - it('deletes the created project if getProject throws StreamNotFoundError', async () => { - const storedProjectId = cryptoRandomString({ length: 10 }) - let deletedProjectId: string | undefined = undefined - - const waitForRegionProject = waitForRegionProjectFactory({ - getProject: async () => { - throw new StreamNotFoundError() - }, - deleteProject: async ({ projectId }) => { - deletedProjectId = projectId - } - }) - const err = await expectToThrow(async () => { - await waitForRegionProject({ - projectId: storedProjectId, - regionKey: cryptoRandomString({ length: 10 }) - }) - }) - expect(storedProjectId).to.equal(deletedProjectId) - expect(err.message).to.equal(new RegionalProjectCreationError().message) - }) - it('just throws the error from the project getter', async () => { - const projectId = cryptoRandomString({ length: 10 }) - let deletedProjectId: string | undefined = undefined - const kabumm = 'kabumm' - - const waitForRegionProject = waitForRegionProjectFactory({ - getProject: async () => { - throw new Error(kabumm) - }, - deleteProject: async ({ projectId }) => { - deletedProjectId = projectId - } - }) - const err = await expectToThrow(async () => { - await waitForRegionProject({ - projectId, - regionKey: cryptoRandomString({ length: 10 }) - }) - }) - expect(deletedProjectId).to.be.undefined - expect(err.message).to.equal(kabumm) - }) - }) }) diff --git a/packages/server/modules/core/tests/users.spec.ts b/packages/server/modules/core/tests/users.spec.ts index b6ed9486b..e8c6535d9 100644 --- a/packages/server/modules/core/tests/users.spec.ts +++ b/packages/server/modules/core/tests/users.spec.ts @@ -25,7 +25,8 @@ import { insertStreamCommitsFactory, insertBranchCommitsFactory, legacyGetPaginatedStreamCommitsPageFactory, - getPaginatedBranchCommitsItemsFactory + getPaginatedBranchCommitsItemsFactory, + deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' import { createCommitByBranchIdFactory, @@ -35,7 +36,6 @@ import { getStreamFactory, grantStreamPermissionsFactory, markCommitStreamUpdatedFactory, - deleteStreamFactory, getUserDeletableStreamsFactory, getExplicitProjects } from '@/modules/core/repositories/streams' @@ -104,9 +104,12 @@ import { getPaginatedBranchCommitsItemsByNameFactory } from '@/modules/core/serv import { getPaginatedStreamBranchesFactory } from '@/modules/core/services/branch/retrieval' import { createObjectFactory } from '@/modules/core/services/objects/management' import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' import { getTestRegionClients } from '@/modules/multiregion/tests/helpers' -import { asMultiregionalOperation } from '@/modules/shared/command' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' import type { ChangeUserPassword, CreateValidatedUser, @@ -114,6 +117,7 @@ import type { UpdateUserAndNotify } from '@/modules/core/domain/users/operations' import { createTestStream } from '@/test/speckle-helpers/streamHelper' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' const getServerInfo = getServerInfoFactory({ db }) const getUser = legacyGetUserFactory({ db }) @@ -252,7 +256,13 @@ const deleteUser: DeleteUser = async (...input) => asMultiregionalOperation( ({ mainDb, allDbs, emit }) => { const deleteUser = deleteUserFactory({ - deleteStream: deleteStreamFactory({ db: mainDb }), + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) + }), logger: dbLogger, isLastAdminUser: isLastAdminUserFactory({ db: mainDb }), getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }), diff --git a/packages/server/modules/core/tests/usersAdmin.spec.ts b/packages/server/modules/core/tests/usersAdmin.spec.ts index dd9696d31..368a0730d 100644 --- a/packages/server/modules/core/tests/usersAdmin.spec.ts +++ b/packages/server/modules/core/tests/usersAdmin.spec.ts @@ -37,7 +37,6 @@ import { deleteAllUserInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites' import { - deleteStreamFactory, getExplicitProjects, getUserDeletableStreamsFactory } from '@/modules/core/repositories/streams' @@ -46,9 +45,17 @@ import { getServerInfoFactory } from '@/modules/core/repositories/server' import { getEventBus } from '@/modules/shared/services/eventBus' import { expect } from 'chai' import { getUserWorkspaceSeatsFactory } from '@/modules/workspacesCore/repositories/workspaces' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' import type { BasicTestUser } from '@/test/authHelper' import { createTestUser } from '@/test/authHelper' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import type { DeleteUser } from '@/modules/core/domain/users/operations' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { getTestRegionClients } from '@/modules/multiregion/tests/helpers' const getUsers = legacyGetPaginatedUsersFactory({ db }) const countUsers = legacyGetPaginatedUsersCountFactory({ db }) @@ -82,19 +89,45 @@ const createUser = createUserFactory({ }), emitEvent: getEventBus().emit }) -const deleteUser = deleteUserFactory({ - deleteStream: deleteStreamFactory({ db }), - logger: dbLogger, - isLastAdminUser: isLastAdminUserFactory({ db }), - getUserDeletableStreams: getUserDeletableStreamsFactory({ db }), - queryAllProjects: queryAllProjectsFactory({ - getExplicitProjects: getExplicitProjects({ db }) - }), - getUserWorkspaceSeats: getUserWorkspaceSeatsFactory({ db }), - deleteAllUserInvites: deleteAllUserInvitesFactory({ db }), - deleteUserRecord: deleteUserRecordFactory({ db }), - emitEvent: getEventBus().emit -}) + +const deleteUser: DeleteUser = async (...input) => + asMultiregionalOperation( + ({ mainDb, allDbs, emit }) => { + const deleteUser = deleteUserFactory({ + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory(allDbs, deleteProjectCommitsFactory) + }), + logger: dbLogger, + isLastAdminUser: isLastAdminUserFactory({ db: mainDb }), + getUserDeletableStreams: getUserDeletableStreamsFactory({ db: mainDb }), + queryAllProjects: queryAllProjectsFactory({ + getExplicitProjects: getExplicitProjects({ db: mainDb }) + }), + getUserWorkspaceSeats: getUserWorkspaceSeatsFactory({ db: mainDb }), + deleteAllUserInvites: deleteAllUserInvitesFactory({ db: mainDb }), + deleteUserRecord: async (params) => { + const [res] = await Promise.all( + allDbs.map((db) => deleteUserRecordFactory({ db })(params)) + ) + + return res + }, + emitEvent: emit + }) + + return deleteUser(...input) + }, + { + logger: dbLogger, + name: 'delete user spec', + dbs: await getTestRegionClients() + } + ) + const getUserRole = getUserRoleFactory({ db }) const buildChangeUserRole = (guestModeEnabled = false) => changeUserRoleFactory({ diff --git a/packages/server/modules/cross-server-sync/index.ts b/packages/server/modules/cross-server-sync/index.ts index af8238021..383f17cf8 100644 --- a/packages/server/modules/cross-server-sync/index.ts +++ b/packages/server/modules/cross-server-sync/index.ts @@ -31,15 +31,12 @@ import { insertBranchCommitsFactory, insertStreamCommitsFactory } from '@/modules/core/repositories/commits' -import { storeModelFactory } from '@/modules/core/repositories/models' import { getObjectFactory, getStreamObjectsFactory, storeSingleObjectIfNotFoundFactory } from '@/modules/core/repositories/objects' import { - deleteProjectFactory, - getProjectFactory, storeProjectFactory, storeProjectRoleFactory } from '@/modules/core/repositories/projects' @@ -59,10 +56,7 @@ import { getViewerResourcesFromLegacyIdentifiersFactory } from '@/modules/core/services/commit/viewerResources' import { createObjectFactory } from '@/modules/core/services/objects/management' -import { - createNewProjectFactory, - waitForRegionProjectFactory -} from '@/modules/core/services/projects' +import { createNewProjectFactory } from '@/modules/core/services/projects' import { downloadCommitFactory } from '@/modules/cross-server-sync/services/commit' import { ensureOnboardingProjectFactory } from '@/modules/cross-server-sync/services/onboardingProject' import { downloadProjectFactory } from '@/modules/cross-server-sync/services/project' @@ -156,13 +150,9 @@ const crossServerSyncModule: SpeckleModule = { }) const createNewProject = createNewProjectFactory({ + // This happens always outside of multiregion ctx storeProject: storeProjectFactory({ db }), - storeModel: storeModelFactory({ db }), storeProjectRole: storeProjectRoleFactory({ db }), - waitForRegionProject: waitForRegionProjectFactory({ - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db }) - }), emitEvent: getEventBus().emit }) diff --git a/packages/server/modules/multiregion/services/projectRegion.ts b/packages/server/modules/multiregion/services/projectRegion.ts index 7fde65e47..7db9f821e 100644 --- a/packages/server/modules/multiregion/services/projectRegion.ts +++ b/packages/server/modules/multiregion/services/projectRegion.ts @@ -88,21 +88,21 @@ export const updateProjectRegionKeyFactory = } export type GetRegionDb = (args: { regionKey: string }) => Promise -type GetDefaultDb = () => Knex - -export type GetProjectDb = (args: { projectId: string }) => Promise +export type GetProjectDb = (args: { + projectId: string +}) => T | Promise export const getProjectDbClientFactory = - ({ + ({ getProjectRegionKey, getDefaultDb, getRegionDb }: { getProjectRegionKey: GetProjectRegionKey - getDefaultDb: GetDefaultDb + getDefaultDb: () => T getRegionDb: GetRegionDb - }): GetProjectDb => + }): GetProjectDb => async ({ projectId }) => { const regionKey = await getProjectRegionKey({ projectId }) if (!regionKey) return getDefaultDb() - return getRegionDb({ regionKey }) + return getRegionDb({ regionKey }) as Promise } diff --git a/packages/server/modules/multiregion/services/queue.ts b/packages/server/modules/multiregion/services/queue.ts index c55b362bd..74f3a0df8 100644 --- a/packages/server/modules/multiregion/services/queue.ts +++ b/packages/server/modules/multiregion/services/queue.ts @@ -9,20 +9,22 @@ import { MultiRegionInvalidJobError, MultiRegionNotYetImplementedError } from '@/modules/multiregion/errors' -import { getProjectDbClient, getRegionDb } from '@/modules/multiregion/utils/dbSelector' +import { + getProjectDbClient, + getRegionDb, + getReplicationDbs +} from '@/modules/multiregion/utils/dbSelector' import { getProjectObjectStorage, getRegionObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector' import { - updateProjectRegionFactory, + moveProjectToRegionFactory, validateProjectRegionCopyFactory } from '@/modules/workspaces/services/projectRegions' import { db } from '@/db/knex' import { - deleteProjectFactory, getProjectFactory, - storeProjectFactory, storeProjectRolesFactory } from '@/modules/core/repositories/projects' import { getAvailableRegionsFactory } from '@/modules/workspaces/services/regions' @@ -36,7 +38,6 @@ import { import { updateProjectRegionKeyFactory } from '@/modules/multiregion/services/projectRegion' import { getGenericRedis } from '@/modules/shared/redis/redis' import { initializeQueue as setupQueue } from '@speckle/shared/queue' -import { getEventBus } from '@/modules/shared/services/eventBus' import { copyWorkspaceFactory, copyProjectsFactory, @@ -56,9 +57,9 @@ import { } from '@/modules/workspaces/repositories/projectRegions' import { withTransaction } from '@/modules/shared/helpers/dbHelper' import { getRedisUrl } from '@/modules/shared/helpers/envHelper' -import { waitForRegionProjectFactory } from '@/modules/core/services/projects' import { chunk } from 'lodash-es' import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' const MULTIREGION_QUEUE_NAME = isTestEnv() ? `test:multiregion:${cryptoRandomString({ length: 5 })}` @@ -173,9 +174,9 @@ export const startQueue = async () => { .private // Move project to target region - const project = await withTransaction( + await withTransaction( async ({ db: targetDbTrx }) => { - const updateProjectRegion = updateProjectRegionFactory({ + const moveProjectToRegion = moveProjectToRegionFactory({ getProject: getProjectFactory({ db: sourceDb }), getAvailableRegions: getAvailableRegionsFactory({ getRegions: getRegionsFactory({ db }), @@ -230,45 +231,40 @@ export const startQueue = async () => { }), countProjectComments: countProjectCommentsFactory({ db: sourceDb }), countProjectWebhooks: countProjectWebhooksFactory({ db: sourceDb }) - }), - updateProjectRegionKey: updateProjectRegionKeyFactory({ - upsertProjectRegionKey: upsertProjectRegionKeyFactory({ - db: targetDbTrx - }), - cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({ - redis: getGenericRedis() - }), - emitEvent: getEventBus().emit }) }) - return updateProjectRegion({ projectId, regionKey }) + await moveProjectToRegion({ projectId, regionKey }) }, - { db: targetDb } + { + db: targetDb + } + ) + + // Update project region in dbs and update relevant caches + const project = await asMultiregionalOperation( + async ({ allDbs, emit }) => + updateProjectRegionKeyFactory({ + upsertProjectRegionKey: replicateFactory( + allDbs, + upsertProjectRegionKeyFactory + ), + cacheDeleteRegionKey: deleteRegionKeyFromCacheFactory({ + redis: getGenericRedis() + }), + emitEvent: emit + })({ projectId, regionKey }), + { + name: 'updateProjectRegion', + description: 'Update project region in db and update relevant caches', + logger, + dbs: await getReplicationDbs({ regionKey }) + } ) // Grab project roles for later reinstating const projectRoles = await getStreamCollaboratorsFactory({ db })(project.id) - // Delete project in main db to "unblock" replication - await deleteProjectFactory({ db })({ projectId: project.id }) - - try { - // Wait for replication from regional db - await waitForRegionProjectFactory({ - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db: targetDb }) - })({ - projectId: project.id, - regionKey, - maxAttempts: 100 - }) - } catch (err) { - // Failed to delete project or await replication, reset project state in main db - await storeProjectFactory({ db })({ project }) - throw err - } - // Reinstate project acl records for (const roles of chunk(projectRoles, 10_000)) { await storeProjectRolesFactory({ db })({ diff --git a/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts b/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts index 915a8674f..1e595b370 100644 --- a/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts +++ b/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts @@ -1,7 +1,6 @@ import { mainDb } from '@/db/knex' import { getMainObjectStorage } from '@/modules/blobstorage/clients/objectStorage' import type { DataRegionsConfig } from '@/modules/multiregion/domain/types' -import { isMultiRegionEnabled } from '@/modules/multiregion/helpers' import { getMultiRegionConfig, setMultiRegionConfig @@ -21,8 +20,10 @@ import { import type { ExecuteOperationOptions, TestApolloServer } from '@/test/graphqlHelper' import { testApolloServer } from '@/test/graphqlHelper' import { beforeEachContext, getRegionKeys } from '@/test/hooks' - -import { truncateRegionsSafely } from '@/test/speckle-helpers/regions' +import { + isMultiRegionTestMode, + truncateRegionsSafely +} from '@/test/speckle-helpers/regions' import { Roles } from '@speckle/shared' import type { MultiRegionConfig } from '@speckle/shared/environment/db' import { getConnectionSettings } from '@speckle/shared/environment/db' @@ -30,7 +31,7 @@ import { expect } from 'chai' import { merge } from 'lodash-es' import { resetRegisteredRegions } from '@/modules/multiregion/utils/dbSelector' -const isEnabled = isMultiRegionEnabled() +const isEnabled = isMultiRegionTestMode() isEnabled ? describe('Multi Region Server Settings @multiregion', () => { diff --git a/packages/server/modules/multiregion/tests/helpers.ts b/packages/server/modules/multiregion/tests/helpers.ts index e425a4f09..3eae3a7d7 100644 --- a/packages/server/modules/multiregion/tests/helpers.ts +++ b/packages/server/modules/multiregion/tests/helpers.ts @@ -1,5 +1,8 @@ import { db } from '@/db/knex' -import { getRegisteredRegionClients } from '@/modules/multiregion/utils/dbSelector' +import { + getRegisteredRegionClients, + getReplicationDbs +} from '@/modules/multiregion/utils/dbSelector' import { isMultiRegionTestMode } from '@/test/speckle-helpers/regions' import type { Knex } from 'knex' @@ -10,3 +13,19 @@ export async function getTestRegionClients(): Promise<[Knex, ...Knex[]]> { const regionDbs = Object.values(regionClients) return [db, ...regionDbs] } + +export async function getTestRegionClientsForProject({ + regionKey +}: { + regionKey?: string +}): Promise<[Knex, ...Knex[]]> { + if (!isMultiRegionTestMode()) return [db] + + if (!regionKey) return [db] + const regionClients = await getRegisteredRegionClients() + + const regionDb = regionClients[regionKey] + if (!regionDb) return [db] + + return await getReplicationDbs({ regionKey }) +} diff --git a/packages/server/modules/multiregion/utils/dbSelector.ts b/packages/server/modules/multiregion/utils/dbSelector.ts index 02ed6e04c..842e1b4cb 100644 --- a/packages/server/modules/multiregion/utils/dbSelector.ts +++ b/packages/server/modules/multiregion/utils/dbSelector.ts @@ -1,4 +1,4 @@ -import { db } from '@/db/knex' +import { db, mainDb } from '@/db/knex' import type { GetProjectDb, GetRegionDb @@ -7,7 +7,6 @@ import { getProjectDbClientFactory } from '@/modules/multiregion/services/projec import type { Knex } from 'knex' import { getRegionFactory } from '@/modules/multiregion/repositories' import { - DatabaseError, LogicError, MisconfiguredEnvironmentError, TestOnlyLogicError @@ -20,14 +19,14 @@ import { getMainRegionConfig } from '@/modules/multiregion/regionConfig' import type { MaybeNullOrUndefined } from '@speckle/shared' -import { ensureError, TIME_MS, wait } from '@speckle/shared' +import { TIME_MS, wait } from '@speckle/shared' import { isTestEnv } from '@/modules/shared/helpers/envHelper' import { migrateDbToLatest } from '@/db/migrations' import { getProjectRegionKey, getRegisteredRegionConfigs } from '@/modules/multiregion/utils/regionSelector' -import { get, mapValues } from 'lodash-es' +import { mapValues } from 'lodash-es' import { isMultiRegionEnabled } from '@/modules/multiregion/helpers' import { logger } from '@/observability/logging' @@ -77,11 +76,42 @@ const initializeDbGetter = async (): Promise => { } // this guy is the star of the show here +// returns where the project is located export const getProjectDbClient: GetProjectDb = async ({ projectId }) => { if (!getter) getter = await initializeDbGetter() return await getter({ projectId }) } +// helper for replication logic +// returns the replication strategy ( locations where data need to be updated at the same time) +// instead of just the target db +export const getProjectReplicationDbs = async ({ + projectId +}: { + projectId: string +}): Promise<[Knex, ...Knex[]]> => { + const getDefaultDb = () => undefined + const projectDb = await getProjectDbClientFactory({ + getDefaultDb, + getRegionDb, + getProjectRegionKey + })({ projectId }) + + return [mainDb, ...(projectDb ? [projectDb] : [])] +} + +export const getReplicationDbs = async ({ + regionKey +}: { + regionKey: string | null +}): Promise<[Knex, ...Knex[]]> => { + if (!regionKey) { + return [mainDb] + } + + return [mainDb, await getRegionDb({ regionKey })] +} + // the default region key is a config value, we're caching this globally let defaultRegionKeyCache: string | null | undefined = undefined @@ -102,6 +132,7 @@ export const getValidDefaultProjectRegionKey = async (): Promise type RegionClients = Record let registeredRegionClients: RegionClients | undefined = undefined +export type DatabaseClient = { client: Knex; isMain: boolean; regionKey: string } /** * Idempotently initialize registered region (in db) Knex clients @@ -135,9 +166,7 @@ export const getRegisteredRegionClients = async (): Promise => { export const getRegisteredDbClients = async (): Promise => Object.values(await getRegisteredRegionClients()) -export const getAllRegisteredDbClients = async (): Promise< - Array<{ client: Knex; isMain: boolean; regionKey: string }> -> => { +export const getAllRegisteredDbClients = async (): Promise> => { const mainDb = db const regionDbs: RegionClients = isMultiRegionEnabled() ? await getRegisteredRegionClients() @@ -190,18 +219,22 @@ export const initializeRegion: InitializeRegion = async ({ regionKey }) => { ? 'require' : 'disable' - await dropUserReplicationIfExists({ + await dropReplicationIfExists({ from: mainDb, to: regionDb, regionName: regionKey, - sslmode + sslmode, + subName: createPubSubName(`userssub_${regionKey}`), + pubName: createPubSubName('userspub') }) - await setUpProjectReplication({ + await dropReplicationIfExists({ from: regionDb, to: mainDb, regionName: regionKey, - sslmode + sslmode, + subName: createPubSubName(`projectsub_${regionKey}`), + pubName: createPubSubName('projectpub') }) } @@ -219,14 +252,13 @@ interface ReplicationArgs { regionName: string } -const dropUserReplicationIfExists = async ({ +const dropReplicationIfExists = async ({ from, to, - regionName -}: ReplicationArgs): Promise => { - const subName = createPubSubName(`userssub_${regionName}`) - const pubName = createPubSubName('userspub') - + regionName, + subName, + pubName +}: ReplicationArgs & { subName: string; pubName: string }): Promise => { try { const { rows: pubExist } = await from.public.raw( `SELECT pubname FROM pg_publication WHERE pubname = '${pubName}';` @@ -278,107 +310,6 @@ const dropUserReplicationIfExists = async ({ return } -const setUpProjectReplication = async ({ - from, - to, - regionName, - sslmode -}: ReplicationArgs): Promise => { - const subName = createPubSubName(`projectsub_${regionName}`) - const pubName = createPubSubName('projectpub') - - try { - await from.public.raw(`CREATE PUBLICATION ${pubName} FOR TABLE streams;`) - } catch (err) { - if (!(err instanceof Error)) - throw new DatabaseError( - 'Could not create publication {pubName} when setting up project replication for region {regionName}', - from.public, - { - cause: ensureError( - sanitizeError(err), - 'Unknown database error when creating publication' - ), - info: { pubName, regionName } - } - ) - if ( - !err.message.includes('already exists') && - !err.message.includes('duplicate key value violates unique constraint') - ) - throw new DatabaseError( - 'Unknown error while creating publication {pubName} when setting up project replication for region {regionName}', - from.public, - { - cause: ensureError( - sanitizeError(err), - 'Unknown database error when creating publication' - ), - info: { pubName, regionName } - } - ) - } - - const fromUrl = new URL( - from.private - ? from.private.client.config.connection.connectionString - : from.public.client.config.connection.connectionString - ) - const port = fromUrl.port ? fromUrl.port : '5432' - const fromDbName = fromUrl.pathname.replace('/', '') - const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription( - ?, - ?, - ?, - ?, - TRUE, - TRUE - );` - try { - await to.public.raw('CREATE EXTENSION IF NOT EXISTS "aiven_extras"') - await to.public.raw(rawSqeel, [ - subName, - `dbname=${fromDbName} host=${fromUrl.hostname} port=${port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}`, - pubName, - subName - ]) - } catch (err) { - if (!(err instanceof Error)) - throw new DatabaseError( - 'Could not create subscription {subName} to {pubName} when setting up project replication for region {regionName}', - to.public, - { - cause: ensureError( - sanitizeError(err), - 'Unknown database error when creating subscription' - ), - info: { subName, pubName, regionName } - } - ) - if ( - !err.message.includes('already exists') && - !err.message.includes('duplicate key value violates unique constraint') - ) - throw new DatabaseError( - 'Unknown error while creating subscription {subName} to {pubName} when setting up project replication for region {regionName}', - to.public, - { - cause: ensureError( - sanitizeError(err), - 'Unknown database error when creating subscription' - ), - info: { subName, pubName, regionName } - } - ) - } -} - -const sanitizeError = (err: unknown): unknown => { - if (!err) return err - if ((get(err, 'where') as unknown as string).includes('password=')) - return { ...err, where: '[REDACTED AS IT CONTAINS CONNECTION STRING]' } -} - export const resetRegisteredRegions = () => { if (!isTestEnv()) { throw new TestOnlyLogicError() diff --git a/packages/server/modules/previews/tests/unit/responses.spec.ts b/packages/server/modules/previews/tests/unit/responses.spec.ts index c6ed7d7fd..9a0e938b4 100644 --- a/packages/server/modules/previews/tests/unit/responses.spec.ts +++ b/packages/server/modules/previews/tests/unit/responses.spec.ts @@ -1,8 +1,8 @@ import { expect } from 'chai' import { responseHandlerFactory } from '@/modules/previews/services/responses' -import { testLogger as logger } from '@/observability/logging' import { buildConsumePreviewResult } from '@/modules/previews/resultListener' import cryptoRandomString from 'crypto-random-string' +import { logger } from '@/observability/logging' describe('object preview @previews', () => { describe('responseHandlerFactory creates a function, that', () => { diff --git a/packages/server/modules/shared/command.ts b/packages/server/modules/shared/command.ts index 8236c7d39..1423dc017 100644 --- a/packages/server/modules/shared/command.ts +++ b/packages/server/modules/shared/command.ts @@ -147,14 +147,6 @@ export const asMultiregionalOperation = async ( * @description reference to the main db (first one passed in the array) */ mainDb: Knex - /** - * @description reference for second db (first one not main) - */ - regionDb: Knex - /** - * @description reference for all regions (all dbs except the main one) - */ - regionDbs: Knex[] emit: EventBusEmit }) => MaybeAsync, params: { @@ -209,8 +201,6 @@ export const asMultiregionalOperation = async ( result = await operation({ mainDb: mainDbTx, allDbs: trxs, - regionDb: regionDbsTx[0], - regionDbs: regionDbsTx, emit }) @@ -267,3 +257,20 @@ export const asMultiregionalOperation = async ( } ) } + +/** + * Helper intended to be used with asMultiregionOperation that returns a curried function + * to apply a factory built with { db: Knex} to multiple dbs, with same input returning the first result. + * @param dbs Knex[] + * @param factory a function that recieves a db constructor + * @returns the result of the first database + */ +export function replicateFactory( + dbs: Knex[], + factory: (context: { db: Knex }) => (...args: Args) => Promise +): (...args: Args) => Promise { + return async (...args: Args): Promise => { + const [result] = await Promise.all(dbs.map((db) => factory({ db })(...args))) + return result + } +} diff --git a/packages/server/modules/shared/test/dbHelper.spec.ts b/packages/server/modules/shared/test/dbHelper.spec.ts index 8a8d80fb2..c35093b1a 100644 --- a/packages/server/modules/shared/test/dbHelper.spec.ts +++ b/packages/server/modules/shared/test/dbHelper.spec.ts @@ -176,9 +176,9 @@ isMultiRegionTestMode() await manyParallelCreates() const [{ count }] = await db('users').count() - expect(count).to.eql(1000) + expect(count).to.eql('1000') - await sleep(1000) // just in case + await sleep(50) const connectionsUsedAfter = main.client.pool.numUsed() expect(connectionsUsedAfter).to.be.lte(connectionsUsedBefore) diff --git a/packages/server/modules/workspaces/domain/operations.ts b/packages/server/modules/workspaces/domain/operations.ts index 7b1aadf3f..2f95b0420 100644 --- a/packages/server/modules/workspaces/domain/operations.ts +++ b/packages/server/modules/workspaces/domain/operations.ts @@ -488,10 +488,10 @@ export type DenyWorkspaceJoinRequest = ( /** * Updates project region and moves all regional data to target regional db */ -export type UpdateProjectRegion = (params: { +export type MoveProjectToRegion = (params: { projectId: string regionKey: string -}) => Promise +}) => Promise /** * Given a count of objects successfully copied to another region, confirm that these counts diff --git a/packages/server/modules/workspaces/graph/resolvers/workspaces.ts b/packages/server/modules/workspaces/graph/resolvers/workspaces.ts index bb4d944bd..96e80171c 100644 --- a/packages/server/modules/workspaces/graph/resolvers/workspaces.ts +++ b/packages/server/modules/workspaces/graph/resolvers/workspaces.ts @@ -5,7 +5,6 @@ import { removePrivateFields } from '@/modules/core/helpers/userHelper' import { updateProjectFactory, getStreamFactory, - deleteStreamFactory, revokeStreamPermissionsFactory, grantStreamPermissionsFactory, getStreamCollaboratorsFactory, @@ -151,9 +150,18 @@ import { import { updateStreamRoleAndNotifyFactory } from '@/modules/core/services/streams/management' import { getUserFactory, getUsersFactory } from '@/modules/core/repositories/users' import { getServerInfoFactory } from '@/modules/core/repositories/server' -import { asOperation, commandFactory } from '@/modules/shared/command' +import { + asMultiregionalOperation, + asOperation, + commandFactory, + replicateFactory +} from '@/modules/shared/command' import { throwIfRateLimitedFactory } from '@/modules/core/utils/ratelimiter' -import { getProjectDbClient, getRegionDb } from '@/modules/multiregion/utils/dbSelector' +import { + getAllRegisteredDbs, + getProjectDbClient, + getProjectReplicationDbs +} from '@/modules/multiregion/utils/dbSelector' import { listUserExpiredSsoSessionsFactory, listWorkspaceSsoMembershipsByUserEmailFactory @@ -180,7 +188,6 @@ import { getWorkspaceWithPlanFactory, upsertWorkspacePlanFactory } from '@/modules/gatekeeper/repositories/billing' -import type { Knex } from 'knex' import { getPaginatedItemsFactory } from '@/modules/shared/services/paginatedItems' import { BadRequestError, UnauthorizedError } from '@/modules/shared/errors' import { @@ -193,6 +200,7 @@ import { } from '@/modules/workspaces/repositories/workspaceJoinRequests' import { sendWorkspaceJoinRequestReceivedEmailFactory } from '@/modules/workspaces/services/workspaceJoinRequestEmails/received' import { + deleteProjectFactory, getProjectFactory, getUserProjectRolesFactory } from '@/modules/core/repositories/projects' @@ -229,10 +237,13 @@ import { } from '@/modules/serverinvites/services/coreFinalization' import { WorkspaceInvitesLimit } from '@/modules/workspaces/domain/constants' import { copyWorkspaceFactory } from '@/modules/workspaces/repositories/projectRegions' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' import { WorkspacePlanNotFoundError } from '@/modules/gatekeeper/errors/billing' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' -const eventBus = getEventBus() const getServerInfo = getServerInfoFactory({ db }) const getUser = getUserFactory({ db }) const getUsers = getUsersFactory({ db }) @@ -805,38 +816,36 @@ export default FF_WORKSPACES_MODULE_ENABLED } } - const deleteWorkspaceFrom = (db: Knex) => - deleteWorkspaceFactory({ - deleteWorkspace: repoDeleteWorkspaceFactory({ db }), - deleteProject: deleteStreamFactory({ db }), - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - queryAllProjects: queryAllProjectsFactory({ - getExplicitProjects: getExplicitProjects({ db }) - }), - deleteSsoProvider: deleteSsoProviderFactory({ db }), - emitWorkspaceEvent: getEventBus().emit - }) + // this is a bit of an overhead, we are issuing delete queries to all regions, + // instead of being selective and clever about figuring out the project DB and only + // deleting from main and the project db + // while workspace must be deleted from all regions - // this should be turned into a get all regions and map over the regions... - const region = await getDefaultRegionFactory({ db })({ workspaceId }) - if (region) { - const regionDb = await getRegionDb({ regionKey: region.key }) - await withOperationLogging( - async () => await deleteWorkspaceFrom(regionDb)({ workspaceId }), - { - logger: logger.child({ regionKey: region.key }), - operationName: 'deleteWorkspaceFromRegion', - operationDescription: 'Delete workspace from region' - } - ) - } - - await withOperationLogging( - async () => await deleteWorkspaceFrom(db)({ workspaceId }), + await asMultiregionalOperation( + async ({ mainDb, allDbs, emit }) => + deleteWorkspaceFactory({ + deleteWorkspace: replicateFactory(allDbs, repoDeleteWorkspaceFactory), + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory( + allDbs, + deleteProjectCommitsFactory + ) + }), + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ + db: mainDb + }), + queryAllProjects: queryAllProjectsFactory({ + getExplicitProjects: getExplicitProjects({ db: mainDb }) + }), + deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }), + emitWorkspaceEvent: emit + })({ workspaceId }), { logger, - operationName: 'deleteWorkspace', - operationDescription: 'Delete workspace' + name: 'delete workspace', + description: 'Delete workspace', + dbs: await getAllRegisteredDbs() } ) @@ -1606,63 +1615,76 @@ export default FF_WORKSPACES_MODULE_ENABLED throw mapAuthToServerError(canMoveToWorkspace.error) } - const moveProjectToWorkspace = commandFactory({ - db, - eventBus, - operationFactory: ({ db, emit }) => + const updatedProject = await asMultiregionalOperation( + ({ mainDb, allDbs, emit }) => moveProjectToWorkspaceFactory({ - getProject: getProjectFactory({ db }), - updateProject: updateProjectFactory({ db: projectDb }), - updateProjectRole: updateStreamRoleAndNotify, - getProjectCollaborators: getStreamCollaboratorsFactory({ db }), + getProject: getProjectFactory({ db: mainDb }), + updateProject: replicateFactory(allDbs, updateProjectFactory), + updateProjectRole: updateStreamRoleAndNotifyFactory({ + isStreamCollaborator: isStreamCollaboratorFactory({ + getStream: getStreamFactory({ db: mainDb }) + }), + addOrUpdateStreamCollaborator: addOrUpdateStreamCollaboratorFactory({ + validateStreamAccess, + getUser: getUserFactory({ db: mainDb }), + grantStreamPermissions: grantStreamPermissionsFactory({ + db: mainDb + }), + getStreamRoles: getStreamRolesFactory({ db: mainDb }), + emitEvent: emit + }), + removeStreamCollaborator + }), + getProjectCollaborators: getStreamCollaboratorsFactory({ db: mainDb }), copyWorkspace: copyWorkspaceFactory({ + // TODO: must be removed when workspace replication is implemented sourceDb: db, targetDb: projectDb }), - getWorkspaceRolesAndSeats: getWorkspaceRolesAndSeatsFactory({ db }), + getWorkspaceRolesAndSeats: getWorkspaceRolesAndSeatsFactory({ + db: mainDb + }), updateWorkspaceRole: addOrUpdateWorkspaceRoleFactory({ - getWorkspaceRoles: getWorkspaceRolesFactory({ db }), - getWorkspaceWithDomains: getWorkspaceWithDomainsFactory({ db }), - findVerifiedEmailsByUserId: findVerifiedEmailsByUserIdFactory({ - db + getWorkspaceRoles: getWorkspaceRolesFactory({ db: mainDb }), + getWorkspaceWithDomains: getWorkspaceWithDomainsFactory({ + db: mainDb }), - upsertWorkspaceRole: upsertWorkspaceRoleFactory({ db }), + findVerifiedEmailsByUserId: findVerifiedEmailsByUserIdFactory({ + db: mainDb + }), + upsertWorkspaceRole: upsertWorkspaceRoleFactory({ db: mainDb }), emitWorkspaceEvent: emit, ensureValidWorkspaceRoleSeat: ensureValidWorkspaceRoleSeatFactory({ - createWorkspaceSeat: createWorkspaceSeatFactory({ db }), - getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db }), + createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }), + getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db: mainDb }), getWorkspaceDefaultSeatType: getWorkspaceDefaultSeatTypeFactory({ - getWorkspace: getWorkspaceFactory({ db }) + getWorkspace: getWorkspaceFactory({ db: mainDb }) }), eventEmit: emit }), assignWorkspaceSeat: assignWorkspaceSeatFactory({ - createWorkspaceSeat: createWorkspaceSeatFactory({ db }), + createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }), getWorkspaceRoleForUser: getWorkspaceRoleForUserFactory({ - db + db: mainDb }), eventEmit: emit, - getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db }) + getWorkspaceUserSeat: getWorkspaceUserSeatFactory({ db: mainDb }) }) }), - createWorkspaceSeat: createWorkspaceSeatFactory({ db }), - getWorkspaceWithPlan: getWorkspaceWithPlanFactory({ db }), - getWorkspaceDomains: getWorkspaceDomainsFactory({ db }), - getUserEmails: findEmailsByUserIdFactory({ db }) - }) - }) - - const updatedProject = await withOperationLogging( - async () => - await moveProjectToWorkspace({ + createWorkspaceSeat: createWorkspaceSeatFactory({ db: mainDb }), + getWorkspaceWithPlan: getWorkspaceWithPlanFactory({ db: mainDb }), + getWorkspaceDomains: getWorkspaceDomainsFactory({ db: mainDb }), + getUserEmails: findEmailsByUserIdFactory({ db: mainDb }) + })({ projectId, workspaceId, movedByUserId: context.userId! }), { logger, - operationName: 'moveProjectToWorkspace', - operationDescription: 'Move project to workspace' + name: 'moveProjectToWorkspace', + description: 'Move project to workspace', + dbs: await getProjectReplicationDbs({ projectId }) } ) diff --git a/packages/server/modules/workspaces/index.ts b/packages/server/modules/workspaces/index.ts index fd30c7392..a722691ab 100644 --- a/packages/server/modules/workspaces/index.ts +++ b/packages/server/modules/workspaces/index.ts @@ -1,4 +1,5 @@ import type cron from 'node-cron' +import type { Logger } from '@/observability/logging' import { moduleLogger } from '@/observability/logging' import { getFeatureFlags } from '@/modules/shared/helpers/envHelper' import { registerOrUpdateScopeFactory } from '@/modules/shared/repositories/scopes' @@ -19,18 +20,21 @@ import { } from '@/modules/core/repositories/scheduledTasks' import { getWorkspacesNonCompleteFactory } from '@/modules/workspaces/repositories/workspaces' import { deleteWorkspacesNonCompleteFactory } from '@/modules/workspaces/services/workspaceCreationState' -import { - deleteStreamFactory, - getExplicitProjects -} from '@/modules/core/repositories/streams' +import { getExplicitProjects } from '@/modules/core/repositories/streams' import { deleteSsoProviderFactory } from '@/modules/workspaces/repositories/sso' -import { getEventBus } from '@/modules/shared/services/eventBus' import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites' import { deleteWorkspaceFactory as repoDeleteWorkspaceFactory } from '@/modules/workspaces/repositories/workspaces' import { deleteWorkspaceFactory } from '@/modules/workspaces/services/management' import { scheduleUpdateAllWorkspacesTracking } from '@/modules/workspaces/services/tracking' import { getClient } from '@/modules/shared/utils/mixpanel' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector' const { FF_WORKSPACES_MODULE_ENABLED, @@ -56,19 +60,39 @@ const scheduleDeleteWorkspacesNonComplete = ({ }: { scheduleExecution: ScheduleExecution }) => { - const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({ - getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db }), - deleteWorkspace: deleteWorkspaceFactory({ - deleteWorkspace: repoDeleteWorkspaceFactory({ db }), - deleteProject: deleteStreamFactory({ db }), - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - queryAllProjects: queryAllProjectsFactory({ - getExplicitProjects: getExplicitProjects({ db }) - }), - deleteSsoProvider: deleteSsoProviderFactory({ db }), - emitWorkspaceEvent: getEventBus().emit - }) - }) + const deleteWorkspacesNonComplete = async ({ logger }: { logger: Logger }) => + asMultiregionalOperation( + ({ allDbs, mainDb, emit }) => { + const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({ + getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db: mainDb }), + deleteWorkspace: deleteWorkspaceFactory({ + deleteWorkspace: replicateFactory(allDbs, repoDeleteWorkspaceFactory), + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory( + allDbs, + deleteProjectCommitsFactory + ) + }), + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ + db: mainDb + }), + queryAllProjects: queryAllProjectsFactory({ + getExplicitProjects: getExplicitProjects({ db: mainDb }) + }), + deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }), + emitWorkspaceEvent: emit + }) + }) + + return deleteWorkspacesNonComplete({ logger }) + }, + { + logger, + name: 'deleteWorkspacesNonComplete', + dbs: await getAllRegisteredDbs() + } + ) const every30Mins = '*/30 * * * *' return scheduleExecution( diff --git a/packages/server/modules/workspaces/services/management.ts b/packages/server/modules/workspaces/services/management.ts index 1abbcfa9d..1b12069f3 100644 --- a/packages/server/modules/workspaces/services/management.ts +++ b/packages/server/modules/workspaces/services/management.ts @@ -58,12 +58,14 @@ import { chunk, isEmpty, omit } from 'lodash-es' import { userEmailsCompliantWithWorkspaceDomains } from '@/modules/workspaces/domain/logic' import { workspaceRoles as workspaceRoleDefinitions } from '@/modules/workspaces/roles' import { blockedDomains } from '@speckle/shared' -import type { DeleteStreamRecord } from '@/modules/core/domain/streams/operations' import type { DeleteSsoProvider, GetWorkspaceSsoProviderRecord } from '@/modules/workspaces/domain/sso/operations' -import type { QueryAllProjects } from '@/modules/core/domain/projects/operations' +import type { + DeleteProjectAndCommits, + QueryAllProjects +} from '@/modules/core/domain/projects/operations' type WorkspaceCreateArgs = { userId: string @@ -288,14 +290,14 @@ type WorkspaceDeleteArgs = { export const deleteWorkspaceFactory = ({ deleteWorkspace, - deleteProject, + deleteProjectAndCommits, queryAllProjects, deleteAllResourceInvites, deleteSsoProvider, emitWorkspaceEvent }: { deleteWorkspace: DeleteWorkspace - deleteProject: DeleteStreamRecord + deleteProjectAndCommits: DeleteProjectAndCommits queryAllProjects: QueryAllProjects deleteAllResourceInvites: DeleteAllResourceInvites deleteSsoProvider: DeleteSsoProvider @@ -328,7 +330,9 @@ export const deleteWorkspaceFactory = // Workspace delete cascades-deletes stream table rows, but some manual cleanup is required // We re-use `deleteStream` (and re-delete the project) to DRY this manual cleanup for (const projectIdsChunk of chunk(projectIds, 25)) { - await Promise.all(projectIdsChunk.map((projectId) => deleteProject(projectId))) + await Promise.all( + projectIdsChunk.map((projectId) => deleteProjectAndCommits({ projectId })) + ) } await emitWorkspaceEvent({ eventName: WorkspaceEvents.Deleted, diff --git a/packages/server/modules/workspaces/services/projectRegions.ts b/packages/server/modules/workspaces/services/projectRegions.ts index d34a0f081..00a0dd84c 100644 --- a/packages/server/modules/workspaces/services/projectRegions.ts +++ b/packages/server/modules/workspaces/services/projectRegions.ts @@ -1,5 +1,4 @@ import type { GetProject } from '@/modules/core/domain/projects/operations' -import type { UpdateProjectRegionKey } from '@/modules/multiregion/services/projectRegion' import type { CopyProjectAutomations, CopyProjectBlobs, @@ -17,13 +16,13 @@ import type { CountProjectVersions, CountProjectWebhooks, GetAvailableRegions, - UpdateProjectRegion, + MoveProjectToRegion, ValidateProjectRegionCopy } from '@/modules/workspaces/domain/operations' import { ProjectRegionAssignmentError } from '@/modules/workspaces/errors/regions' import { logger } from '@/observability/logging' -export const updateProjectRegionFactory = +export const moveProjectToRegionFactory = (deps: { getProject: GetProject getAvailableRegions: GetAvailableRegions @@ -37,8 +36,7 @@ export const updateProjectRegionFactory = copyProjectWebhooks: CopyProjectWebhooks copyProjectBlobs: CopyProjectBlobs validateProjectRegionCopy: ValidateProjectRegionCopy - updateProjectRegionKey: UpdateProjectRegionKey - }): UpdateProjectRegion => + }): MoveProjectToRegion => async (params) => { const { projectId, regionKey } = params @@ -120,9 +118,6 @@ export const updateProjectRegionFactory = 'Missing data from source project in target region copy after move.' ) } - - // Update project region in db and update relevant caches - return await deps.updateProjectRegionKey({ projectId, regionKey }) } export const validateProjectRegionCopyFactory = diff --git a/packages/server/modules/workspaces/services/projects.ts b/packages/server/modules/workspaces/services/projects.ts index eb444a2f0..20e5dca90 100644 --- a/packages/server/modules/workspaces/services/projects.ts +++ b/packages/server/modules/workspaces/services/projects.ts @@ -30,21 +30,15 @@ import { ProjectNotFoundError } from '@/modules/core/errors/projects' import type { WorkspaceProjectCreateInput } from '@/modules/core/graph/generated/graphql' import { getDb, + getReplicationDbs, getValidDefaultProjectRegionKey } from '@/modules/multiregion/utils/dbSelector' +import { createNewProjectFactory } from '@/modules/core/services/projects' import { - createNewProjectFactory, - waitForRegionProjectFactory -} from '@/modules/core/services/projects' -import { - deleteProjectFactory, - getProjectFactory, storeProjectFactory, storeProjectRoleFactory } from '@/modules/core/repositories/projects' import { mainDb } from '@/db/knex' -import { storeModelFactory } from '@/modules/core/repositories/models' -import { getEventBus } from '@/modules/shared/services/eventBus' import { getWorkspaceFactory, upsertWorkspaceFactory @@ -59,6 +53,8 @@ import type { FindEmailsByUserId } from '@/modules/core/domain/userEmails/operat import { userEmailsCompliantWithWorkspaceDomains } from '@/modules/workspaces/domain/logic' import type { CreateWorkspaceSeat } from '@/modules/gatekeeper/domain/operations' import type { WorkspaceAcl } from '@/modules/workspacesCore/domain/types' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' type MoveProjectToWorkspaceArgs = { projectId: string @@ -308,6 +304,7 @@ export const validateWorkspaceMemberProjectRoleFactory = } } +// This factory uses the command factory to create a new project in transactional (cross region) so it cannot be wrapped in another transaction export const createWorkspaceProjectFactory = (deps: { getDefaultRegion: GetDefaultRegion }) => async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => { @@ -334,26 +331,28 @@ export const createWorkspaceProjectFactory = if (!workspace) throw new WorkspaceNotFoundError() await upsertWorkspaceFactory({ db: projectDb })({ workspace }) } + const project = await asMultiregionalOperation( + async ({ allDbs, mainDb, emit }) => { + const createNewProject = createNewProjectFactory({ + // TODO: this goes as event emmits outside (default model) + storeProject: replicateFactory(allDbs, storeProjectFactory), + // THIS MUST GO TO THE MAIN DB + storeProjectRole: storeProjectRoleFactory({ db: mainDb }), + emitEvent: emit + }) - // todo, use the command factory here, but for that, we need to migrate to the event bus - // deps not injected to ensure proper DB injection - const createNewProject = createNewProjectFactory({ - storeProject: storeProjectFactory({ db: projectDb }), - storeModel: storeModelFactory({ db: projectDb }), - // THIS MUST GO TO THE MAIN DB - storeProjectRole: storeProjectRoleFactory({ db }), - waitForRegionProject: waitForRegionProjectFactory({ - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db: projectDb }) - }), - emitEvent: getEventBus().emit - }) - - const project = await createNewProject({ - ...input, - regionKey, - ownerId - }) + return createNewProject({ + ...input, + regionKey, + ownerId + }) + }, + { + dbs: await getReplicationDbs({ regionKey }), + name: 'Create project workspace', + logger + } + ) return project } diff --git a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts index 99a1f4f0c..78d64c11c 100644 --- a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts +++ b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts @@ -2,12 +2,7 @@ import { db } from '@/db/knex' import { StreamAcl, Streams } from '@/modules/core/dbSchema' import type { StreamRecord } from '@/modules/core/helpers/types' import { ProjectRecordVisibility } from '@/modules/core/helpers/types' -import { - deleteProjectFactory, - getProjectFactory -} from '@/modules/core/repositories/projects' import { grantStreamPermissionsFactory } from '@/modules/core/repositories/streams' -import { waitForRegionProjectFactory } from '@/modules/core/services/projects' import { WorkspaceSeatType } from '@/modules/gatekeeper/domain/billing' import { getWorkspaceUserSeatsFactory } from '@/modules/gatekeeper/repositories/workspaceSeat' import { getRegionDb } from '@/modules/multiregion/utils/dbSelector' @@ -1010,13 +1005,6 @@ describe('Workspace project GQL CRUD', () => { // Simulate non-main default db region const regionDb = await getRegionDb({ regionKey: 'region1' }) await tables.streams(regionDb).insert(regionalProject) - await waitForRegionProjectFactory({ - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db: regionDb }) - })({ - projectId: regionalProject.id, - regionKey: 'region1' - }) await grantStreamPermissions({ streamId: regionalProject.id, userId: serverAdminUser.id, diff --git a/packages/server/modules/workspaces/tests/integration/workspacesCreationState.spec.ts b/packages/server/modules/workspaces/tests/integration/workspacesCreationState.spec.ts index 52d0551f2..50aa5f697 100644 --- a/packages/server/modules/workspaces/tests/integration/workspacesCreationState.spec.ts +++ b/packages/server/modules/workspaces/tests/integration/workspacesCreationState.spec.ts @@ -14,17 +14,21 @@ import { import { expect } from 'chai' import dayjs from 'dayjs' import { deleteWorkspacesNonCompleteFactory } from '@/modules/workspaces/services/workspaceCreationState' +import type { Logger } from '@/observability/logging' import { logger } from '@/observability/logging' -import { - deleteStreamFactory, - getExplicitProjects -} from '@/modules/core/repositories/streams' +import { getExplicitProjects } from '@/modules/core/repositories/streams' import { deleteSsoProviderFactory } from '@/modules/workspaces/repositories/sso' -import { getEventBus } from '@/modules/shared/services/eventBus' import { deleteAllResourceInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites' import { deleteWorkspaceFactory as repoDeleteWorkspaceFactory } from '@/modules/workspaces/repositories/workspaces' import { deleteWorkspaceFactory } from '@/modules/workspaces/services/management' -import { queryAllProjectsFactory } from '@/modules/core/services/projects' +import { + deleteProjectAndCommitsFactory, + queryAllProjectsFactory +} from '@/modules/core/services/projects' +import { deleteProjectFactory } from '@/modules/core/repositories/projects' +import { deleteProjectCommitsFactory } from '@/modules/core/repositories/commits' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { getAllRegisteredDbs } from '@/modules/multiregion/utils/dbSelector' const updateAWorkspaceCreatedAt = async ( workspaceId: string, @@ -39,19 +43,45 @@ const updateAWorkspaceCreatedAt = async ( describe('WorkspaceCreationState services', () => { const getWorkspacesNonComplete = getWorkspacesNonCompleteFactory({ db }) const getWorkspace = getWorkspaceFactory({ db }) - const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({ - getWorkspacesNonComplete, - deleteWorkspace: deleteWorkspaceFactory({ - deleteWorkspace: repoDeleteWorkspaceFactory({ db }), - deleteProject: deleteStreamFactory({ db }), - deleteAllResourceInvites: deleteAllResourceInvitesFactory({ db }), - queryAllProjects: queryAllProjectsFactory({ - getExplicitProjects: getExplicitProjects({ db }) - }), - deleteSsoProvider: deleteSsoProviderFactory({ db }), - emitWorkspaceEvent: getEventBus().emit - }) - }) + const deleteWorkspacesNonComplete = async ({ logger }: { logger: Logger }) => + asMultiregionalOperation( + ({ allDbs, mainDb, emit }) => { + const deleteWorkspacesNonComplete = deleteWorkspacesNonCompleteFactory({ + getWorkspacesNonComplete: getWorkspacesNonCompleteFactory({ db: mainDb }), + deleteWorkspace: deleteWorkspaceFactory({ + deleteWorkspace: async (...input) => { + const [res] = await Promise.all( + allDbs.map((db) => repoDeleteWorkspaceFactory({ db })(...input)) + ) + + return res + }, + deleteProjectAndCommits: deleteProjectAndCommitsFactory({ + deleteProject: replicateFactory(allDbs, deleteProjectFactory), + deleteProjectCommits: replicateFactory( + allDbs, + deleteProjectCommitsFactory + ) + }), + deleteAllResourceInvites: deleteAllResourceInvitesFactory({ + db: mainDb + }), + queryAllProjects: queryAllProjectsFactory({ + getExplicitProjects: getExplicitProjects({ db: mainDb }) + }), + deleteSsoProvider: deleteSsoProviderFactory({ db: mainDb }), + emitWorkspaceEvent: emit + }) + }) + + return deleteWorkspacesNonComplete({ logger }) + }, + { + logger, + name: 'deleteWorkspacesNonComplete', + dbs: await getAllRegisteredDbs() + } + ) let adminUser: BasicTestUser let completeWorkspace: BasicTestWorkspace diff --git a/packages/server/package.json b/packages/server/package.json index 75a6d75bf..29cab51f1 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -28,7 +28,7 @@ "ts-gqlgen": "tsx --import ./esmLoader.js ./bin/gqlgen", "test": "cross-env TSX=true NODE_ENV=test LOG_FILTER=test LOG_PRETTY=true yarn ts-mocha", "test:all-ff": "cross-env ENABLE_ALL_FFS=true yarn test", - "test:multiregion": "cross-env RUN_TESTS_IN_MULTIREGION_MODE=true FF_WORKSPACES_MODULE_ENABLED=true FF_WORKSPACES_MULTI_REGION_ENABLED=true yarn test --grep @multiregion", + "test:multiregion": "cross-env RUN_TESTS_IN_MULTIREGION_MODE=true FF_WORKSPACES_MODULE_ENABLED=true FF_WORKSPACES_MULTI_REGION_ENABLED=true FF_MOVE_PROJECT_REGION_ENABLED=true yarn test --grep @multiregion", "test:no-ff": "cross-env DISABLE_ALL_FFS=true yarn test", "test:coverage": "cross-env NODE_ENV=test LOG_FILTER=test LOG_PRETTY=true c8 yarn test", "test:report": "MOCHA_FILE=reports/test-results.xml yarn test:coverage -- --reporter mocha-multi --reporter-options spec=-,mocha-junit-reporter=reports/test-results.xml", diff --git a/packages/server/scripts/streamObjects.ts b/packages/server/scripts/streamObjects.ts index 823a9ed0e..0280160fd 100644 --- a/packages/server/scripts/streamObjects.ts +++ b/packages/server/scripts/streamObjects.ts @@ -30,7 +30,6 @@ import { import { collectAndValidateCoreTargetsFactory } from '@/modules/serverinvites/services/coreResourceCollection' import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/services/coreEmailContents' import { getEventBus } from '@/modules/shared/services/eventBus' -import { createBranchFactory } from '@/modules/core/repositories/branches' import { getUsersFactory, getUserFactory, @@ -120,6 +119,7 @@ const buildFinalizeProjectInvite = () => getServerInfo }) +// This does not support multiregion const createStream = legacyCreateStreamFactory({ createStreamReturnRecord: createStreamReturnRecordFactory({ inviteUsersToProject: inviteUsersToProjectFactory({ @@ -145,7 +145,6 @@ const createStream = legacyCreateStreamFactory({ }), storeProjectRole: storeProjectRoleFactory({ db }), createStream: createStreamFactory({ db }), - createBranch: createBranchFactory({ db }), emitEvent: getEventBus().emit }) }) diff --git a/packages/server/test/projectHelper.ts b/packages/server/test/projectHelper.ts index faa39b047..a44a02190 100644 --- a/packages/server/test/projectHelper.ts +++ b/packages/server/test/projectHelper.ts @@ -19,7 +19,6 @@ import { getStreamRolesFactory, grantStreamPermissionsFactory } from '@/modules/core/repositories/streams' -import { createBranchFactory } from '@/modules/core/repositories/branches' import { finalizeInvitedServerRegistrationFactory, finalizeResourceInviteFactory @@ -98,6 +97,7 @@ const buildFinalizeProjectInvite = () => getServerInfo }) +// This is not supporting multiregion export const createProject = createStreamReturnRecordFactory({ inviteUsersToProject: inviteUsersToProjectFactory({ createAndSendInvite: createAndSendInviteFactory({ @@ -121,7 +121,6 @@ export const createProject = createStreamReturnRecordFactory({ getUsers }), createStream: createStreamFactory({ db }), - createBranch: createBranchFactory({ db }), storeProjectRole: storeProjectRoleFactory({ db }), emitEvent: getEventBus().emit }) diff --git a/packages/server/test/speckle-helpers/streamHelper.ts b/packages/server/test/speckle-helpers/streamHelper.ts index ba61f1de3..6a2b4c69c 100644 --- a/packages/server/test/speckle-helpers/streamHelper.ts +++ b/packages/server/test/speckle-helpers/streamHelper.ts @@ -2,7 +2,6 @@ import { db } from '@/db/knex' import { StreamAcl } from '@/modules/core/dbSchema' import { mapDbToGqlProjectVisibility } from '@/modules/core/helpers/project' import type { StreamAclRecord, StreamRecord } from '@/modules/core/helpers/types' -import { createBranchFactory } from '@/modules/core/repositories/branches' import { getServerInfoFactory } from '@/modules/core/repositories/server' import { createStreamFactory, @@ -65,9 +64,12 @@ import type { StreamRoles } from '@speckle/shared' import { ensureError, Roles } from '@speckle/shared' import { omit } from 'lodash-es' import { storeProjectRoleFactory } from '@/modules/core/repositories/projects' +import { asMultiregionalOperation, replicateFactory } from '@/modules/shared/command' +import { logger } from '@/observability/logging' +import type { LegacyCreateStream } from '@/modules/core/domain/streams/operations' +import { getReplicationDbs } from '@/modules/multiregion/utils/dbSelector' const getServerInfo = getServerInfoFactory({ db }) -const getUsers = getUsersFactory({ db }) const getUser = getUserFactory({ db }) const getStream = getStreamFactory({ db }) @@ -117,35 +119,42 @@ const buildFinalizeProjectInvite = () => getServerInfo }) -const createStream = legacyCreateStreamFactory({ - createStreamReturnRecord: createStreamReturnRecordFactory({ - inviteUsersToProject: inviteUsersToProjectFactory({ - createAndSendInvite: createAndSendInviteFactory({ - findUserByTarget: findUserByTargetFactory({ db }), - insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db }), - collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({ - getStream - }), - buildInviteEmailContents: buildCoreInviteEmailContentsFactory({ - getStream - }), - emitEvent: ({ eventName, payload }) => - getEventBus().emit({ - eventName, - payload +const createStream: LegacyCreateStream = async ( + stream: Parameters[0] & { regionKey?: string } +) => + asMultiregionalOperation( + async ({ allDbs, mainDb, emit }) => + legacyCreateStreamFactory({ + createStreamReturnRecord: createStreamReturnRecordFactory({ + inviteUsersToProject: inviteUsersToProjectFactory({ + createAndSendInvite: createAndSendInviteFactory({ + findUserByTarget: findUserByTargetFactory({ db: mainDb }), + insertInviteAndDeleteOld: insertInviteAndDeleteOldFactory({ db: mainDb }), + collectAndValidateResourceTargets: collectAndValidateCoreTargetsFactory({ + getStream: getStreamFactory({ db: mainDb }) + }), + buildInviteEmailContents: buildCoreInviteEmailContentsFactory({ + getStream: getStreamFactory({ db: mainDb }) + }), + emitEvent: emit, + getUser: getUserFactory({ db: mainDb }), + getServerInfo: getServerInfoFactory({ db: mainDb }), + finalizeInvite: buildFinalizeProjectInvite() + }), + getUsers: getUsersFactory({ db: mainDb }) }), - getUser, - getServerInfo, - finalizeInvite: buildFinalizeProjectInvite() - }), - getUsers - }), - createStream: createStreamFactory({ db }), - createBranch: createBranchFactory({ db }), - storeProjectRole: storeProjectRoleFactory({ db }), - emitEvent: getEventBus().emit - }) -}) + createStream: replicateFactory(allDbs, createStreamFactory), + storeProjectRole: storeProjectRoleFactory({ db: mainDb }), + emitEvent: emit + }) + })(stream), + { + name: 'create stream spec', + logger, + description: 'Creates a new stream', + dbs: await getReplicationDbs({ regionKey: stream.regionKey || null }) + } + ) const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver }) const isStreamCollaborator = isStreamCollaboratorFactory({ @@ -220,6 +229,7 @@ export async function createTestStream>( }, ownerId: owner.id }) + id = newProject.id } else { id = await createStream({