From 7fb3a97ee5fef0cfdca782f8c2a6979fcf822976 Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Fri, 8 Nov 2024 17:42:11 +0100 Subject: [PATCH] chore(multiregion): initial work for models, versions and objects --- .../modules/core/graph/resolvers/models.ts | 204 +++++++++++------- .../modules/core/graph/resolvers/objects.ts | 28 ++- .../modules/core/graph/resolvers/versions.ts | 15 +- 3 files changed, 155 insertions(+), 92 deletions(-) diff --git a/packages/server/modules/core/graph/resolvers/models.ts b/packages/server/modules/core/graph/resolvers/models.ts index 3108812cb..bef995d3a 100644 --- a/packages/server/modules/core/graph/resolvers/models.ts +++ b/packages/server/modules/core/graph/resolvers/models.ts @@ -61,72 +61,7 @@ import { } from '@/modules/core/repositories/streams' import { ModelsEmitter } from '@/modules/core/events/modelsEmitter' import { saveActivityFactory } from '@/modules/activitystream/repositories' - -const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db }) -const getStream = getStreamFactory({ db }) -const getStreamObjects = getStreamObjectsFactory({ db }) -const getViewerResourceGroups = getViewerResourceGroupsFactory({ - getStreamObjects, - getBranchLatestCommits: getBranchLatestCommitsFactory({ db }), - getStreamBranchesByName: getStreamBranchesByNameFactory({ db }), - getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }), - getAllBranchCommits: getAllBranchCommitsFactory({ db }) -}) - -const getPaginatedProjectModels = getPaginatedProjectModelsFactory({ - getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({ db }), - getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory({ - db - }) -}) -const getModelTreeItems = getModelTreeItemsFactory({ db }) -const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({ - getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db }), - getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory({ - db - }), - getModelTreeItems, - getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({ db }) -}) -const createBranchAndNotify = createBranchAndNotifyFactory({ - getStreamBranchByName: getStreamBranchByNameFactory({ db }), - createBranch: createBranchFactory({ db }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) -const updateBranchAndNotify = updateBranchAndNotifyFactory({ - getBranchById: getBranchByIdFactory({ db }), - updateBranch: updateBranchFactory({ db }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) -const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ - getStream, - getBranchById: getBranchByIdFactory({ db }), - modelsEventsEmitter: ModelsEmitter.emit, - markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), - deleteBranchById: deleteBranchByIdFactory({ db }) -}) - -const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({ - getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }), - getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({ db }), - getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db }) -}) -const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({ - legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory({ - db - }), - getStreamCommitCount: getStreamCommitCountFactory({ db }) -}) +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' export = { User: { @@ -134,26 +69,45 @@ export = { const authoredOnly = args.authoredOnly return { totalCount: authoredOnly - ? await ctx.loaders.users.getAuthoredCommitCount.load(parent.id) + ? // TODO: make one dataloader for region and sum + await ctx.loaders + .forRegion({ db: regionDB }) + .users.getAuthoredCommitCount.load(parent.id) : await ctx.loaders.users.getStreamCommitCount.load(parent.id) } } }, Project: { async models(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) // If limit=0 & no filter, short-cut full execution and use data loader if (args.limit === 0 && !args.filter) { return { - totalCount: await ctx.loaders.streams.getBranchCount.load(parent.id), + totalCount: await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getBranchCount.load(parent.id), items: [], cursor: null } } + const getPaginatedProjectModels = getPaginatedProjectModelsFactory({ + getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({ + db: projectDB + }), + getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory( + { + db: projectDB + } + ) + }) return await getPaginatedProjectModels(parent.id, args) }, - async model(_parent, args, ctx) { - const model = await ctx.loaders.branches.getById.load(args.id) + async model(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const model = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getById.load(args.id) if (!model) { throw new BranchNotFoundError('Model not found') } @@ -161,8 +115,10 @@ export = { return model }, async modelByName(parent, args, ctx) { - const model = await ctx.loaders.streams.getStreamBranchByName - .forStream(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const model = await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamBranchByName.forStream(parent.id) .load(args.name) if (!model) { throw new BranchNotFoundError('Model not found') @@ -171,9 +127,25 @@ export = { return model }, async modelsTree(parent, args) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) + const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({ + getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db: projectDB }), + getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory( + { + db: projectDB + } + ), + getModelTreeItems, + getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({ + db: projectDB + }) + }) return await getProjectTopLevelModelsTree(parent.id, args) }, async modelChildrenTree(parent, { fullName }) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.id, {}, @@ -183,6 +155,15 @@ export = { ) }, async viewerResources(parent, { resourceIdString, loadedVersionsOnly }) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getStreamObjects = getStreamObjectsFactory({ db: projectDB }) + const getViewerResourceGroups = getViewerResourceGroupsFactory({ + getStreamObjects, + getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDB }), + getStreamBranchesByName: getStreamBranchesByNameFactory({ db: projectDB }), + getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }), + getAllBranchCommits: getAllBranchCommitsFactory({ db: projectDB }) + }) return await getViewerResourceGroups({ projectId: parent.id, resourceIdString, @@ -190,6 +171,7 @@ export = { }) }, async versions(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) // If limit=0, short-cut full execution and use data loader if (args.limit === 0) { return { @@ -201,6 +183,14 @@ export = { } } + const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({ + legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory( + { + db: projectDB + } + ), + getStreamCommitCount: getStreamCommitCountFactory({ db: projectDB }) + }) return await getPaginatedStreamCommits(parent.id, args) } }, @@ -209,11 +199,16 @@ export = { return await ctx.loaders.users.getUser.load(parent.authorId) }, async previewUrl(parent, _args, ctx) { - const latestCommit = await ctx.loaders.branches.getLatestCommit.load(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const latestCommit = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getLatestCommit.load(parent.id) const path = `/preview/${parent.streamId}/commits/${latestCommit?.id || ''}` return latestCommit ? new URL(path, getServerOrigin()).toString() : null }, async childrenTree(parent) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.streamId, {}, @@ -226,15 +221,25 @@ export = { return last(parent.name.split('/')) }, async versions(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) // If limit=0 & no filter, short-cut full execution and use data loader if (!args.filter && args.limit === 0) { return { - totalCount: await ctx.loaders.branches.getCommitCount.load(parent.id), + totalCount: await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getCommitCount.load(parent.id), items: [], cursor: null } } + const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({ + getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }), + getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({ + db: projectDB + }), + getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db: projectDB }) + }) return await getPaginatedBranchCommits({ branchId: parent.id, cursor: args.cursor, @@ -243,10 +248,13 @@ export = { }) }, async version(parent, args, ctx) { - const version = await ctx.loaders.branches.getBranchCommit.load({ - branchId: parent.id, - commitId: args.id - }) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const version = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getBranchCommit.load({ + branchId: parent.id, + commitId: args.id + }) if (!version) { throw new CommitNotFoundError('Version not found') } @@ -256,11 +264,15 @@ export = { }, ModelsTreeItem: { async model(parent, _args, ctx) { - return await ctx.loaders.streams.getStreamBranchByName - .forStream(parent.projectId) + const projectDB = await getProjectDbClient({ projectId: parent.projectId }) + return await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamBranchByName.forStream(parent.projectId) .load(parent.fullName) }, async children(parent) { + const projectDB = await getProjectDbClient({ projectId: parent.projectId }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.projectId, {}, @@ -281,6 +293,15 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const createBranchAndNotify = createBranchAndNotifyFactory({ + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDB }), + createBranch: createBranchFactory({ db: projectDB }), + addBranchCreatedActivity: addBranchCreatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) return await createBranchAndNotify(args.input, ctx.userId!) }, async update(_parent, args, ctx) { @@ -290,6 +311,15 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const updateBranchAndNotify = updateBranchAndNotifyFactory({ + getBranchById: getBranchByIdFactory({ db: projectDB }), + updateBranch: updateBranchFactory({ db: projectDB }), + addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) return await updateBranchAndNotify(args.input, ctx.userId!) }, async delete(_parent, args, ctx) { @@ -299,6 +329,20 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db: projectDB }) + const getStream = getStreamFactory({ db }) + const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ + getStream, + getBranchById: getBranchByIdFactory({ db: projectDB }), + modelsEventsEmitter: ModelsEmitter.emit, + markBranchStreamUpdated, + addBranchDeletedActivity: addBranchDeletedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }), + deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) + }) return await deleteBranchAndNotify(args.input, ctx.userId!) } }, diff --git a/packages/server/modules/core/graph/resolvers/objects.ts b/packages/server/modules/core/graph/resolvers/objects.ts index ee0b80599..90b963d08 100644 --- a/packages/server/modules/core/graph/resolvers/objects.ts +++ b/packages/server/modules/core/graph/resolvers/objects.ts @@ -10,19 +10,19 @@ import { } from '@/modules/core/repositories/objects' import { db } from '@/db/knex' import { createObjectsFactory } from '@/modules/core/services/objects/management' +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' -const getObject = getObjectFactory({ db }) -const createObjects = createObjectsFactory({ - storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }), - storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db }) -}) -const getObjectChildren = getObjectChildrenFactory({ db }) -const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db }) -type GetObjectChildrenQueryParams = Parameters[0] +type GetObjectChildrenQueryParams = Parameters< + ReturnType +>[0] const getStreamObject: NonNullable['object'] = async function object(parent, args) { - return (await getObject(args.id, parent.id)) || null + return ( + (await getObjectFactory({ + db: await getProjectDbClient({ projectId: parent.id }) + })(args.id, parent.id)) || null + ) } export = { @@ -34,8 +34,10 @@ export = { }, Object: { async children(parent, args) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) // The simple query branch if (!args.query && !args.orderBy) { + const getObjectChildren = getObjectChildrenFactory({ db }) const result = await getObjectChildren({ streamId: parent.streamId, objectId: parent.id, @@ -60,6 +62,7 @@ export = { } } + const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db: projectDB }) // The complex query branch const result = await getObjectChildrenQuery({ streamId: parent.streamId, @@ -98,6 +101,13 @@ export = { context.resourceAccessRules ) + const projectDB = await getProjectDbClient({ + projectId: args.objectInput.streamId + }) + const createObjects = createObjectsFactory({ + storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db: projectDB }), + storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db: projectDB }) + }) const ids = await createObjects({ streamId: args.objectInput.streamId, objects: args.objectInput.objects.filter(isNonNullable) diff --git a/packages/server/modules/core/graph/resolvers/versions.ts b/packages/server/modules/core/graph/resolvers/versions.ts index 19c7f0d75..7366f43f7 100644 --- a/packages/server/modules/core/graph/resolvers/versions.ts +++ b/packages/server/modules/core/graph/resolvers/versions.ts @@ -56,6 +56,7 @@ import { } from '@/modules/activitystream/services/commitActivity' import { getObjectFactory } from '@/modules/core/repositories/objects' import { saveActivityFactory } from '@/modules/activitystream/repositories' +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db }) const getCommitStream = getCommitStreamFactory({ db }) @@ -117,8 +118,10 @@ const batchDeleteCommits = batchDeleteCommitsFactory({ export = { Project: { async version(parent, args, ctx) { - const version = await ctx.loaders.streams.getStreamCommit - .forStream(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const version = await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamCommit.forStream(parent.id) .load(args.id) if (!version) { throw new CommitNotFoundError('Version not found') @@ -134,7 +137,10 @@ export = { return (await ctx.loaders.users.getUser.load(author)) || null }, async model(parent, _args, ctx) { - return await ctx.loaders.commits.getCommitBranch.load(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + return await ctx.loaders + .forRegion({ db: projectDB }) + .commits.getCommitBranch.load(parent.id) }, async previewUrl(parent, _args, ctx) { const stream = await ctx.loaders.commits.getCommitStream.load(parent.id) @@ -147,13 +153,16 @@ export = { }, VersionMutations: { async moveToModel(_parent, args, ctx) { + // TODO: how to get streamId here? return await batchMoveCommits(args.input, ctx.userId!) }, async delete(_parent, args, ctx) { + // TODO: how to get streamId here? await batchDeleteCommits(args.input, ctx.userId!) return true }, async update(_parent, args, ctx) { + // TODO: how to get streamId here? const stream = await ctx.loaders.commits.getCommitStream.load( args.input.versionId )