From 23445f91e267b8dea4db4658bdc40372b1e86771 Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Fri, 8 Nov 2024 17:40:50 +0100 Subject: [PATCH 01/10] chore(multiregion): add streamId to versions --- .../modules/core/domain/commits/operations.ts | 4 ++-- .../modules/core/graph/dataloaders/index.ts | 8 +++++-- .../server/modules/core/helpers/graphTypes.ts | 3 ++- .../modules/core/repositories/branches.ts | 2 +- .../modules/core/repositories/commits.ts | 20 ++++++++++++---- .../core/services/commit/management.ts | 24 +++++++++---------- 6 files changed, 38 insertions(+), 23 deletions(-) diff --git a/packages/server/modules/core/domain/commits/operations.ts b/packages/server/modules/core/domain/commits/operations.ts index b853d5f94..56f4de478 100644 --- a/packages/server/modules/core/domain/commits/operations.ts +++ b/packages/server/modules/core/domain/commits/operations.ts @@ -74,7 +74,7 @@ export type CreateCommitByBranchId = ( options?: Partial<{ notify: boolean }> -) => Promise +) => Promise> export type CreateCommitByBranchName = ( params: NullableKeysToOptional<{ @@ -109,7 +109,7 @@ export type InsertStreamCommits = ( export type UpdateCommitAndNotify = ( params: CommitUpdateInput | UpdateVersionInput, userId: string -) => Promise +) => Promise> export type GetCommitBranches = (commitIds: string[]) => Promise diff --git a/packages/server/modules/core/graph/dataloaders/index.ts b/packages/server/modules/core/graph/dataloaders/index.ts index dc7cba6bb..0db7d5b90 100644 --- a/packages/server/modules/core/graph/dataloaders/index.ts +++ b/packages/server/modules/core/graph/dataloaders/index.ts @@ -89,6 +89,7 @@ import { getUsersFactory, UserWithOptionalRole } from '@/modules/core/repositories/users' +import { CommitWithStreamBranchMetadata } from '@/modules/core/domain/commits/types' declare module '@/modules/core/loaders' { interface ModularizedDataLoaders extends ReturnType {} @@ -170,14 +171,17 @@ const dataLoadersDefinition = defineRequestDataloaders( * thus its own query. */ getStreamCommit: (() => { - type CommitDataLoader = DataLoader> + type CommitDataLoader = DataLoader< + string, + Nullable + > const streamCommitLoaders = new Map() return { clearAll: () => streamCommitLoaders.clear(), forStream(streamId: string): CommitDataLoader { let loader = streamCommitLoaders.get(streamId) if (!loader) { - loader = createLoader>( + loader = createLoader>( async (commitIds) => { const results = keyBy( await getCommits(commitIds.slice(), { streamId }), diff --git a/packages/server/modules/core/helpers/graphTypes.ts b/packages/server/modules/core/helpers/graphTypes.ts index b8a0f7fed..4e0b0507c 100644 --- a/packages/server/modules/core/helpers/graphTypes.ts +++ b/packages/server/modules/core/helpers/graphTypes.ts @@ -1,4 +1,5 @@ import { + CommitWithStreamBranchMetadata, LegacyStreamCommit, LegacyUserCommit } from '@/modules/core/domain/commits/types' @@ -43,7 +44,7 @@ export type ProjectGraphQLReturn = StreamGraphQLReturn export type ModelGraphQLReturn = BranchRecord -export type VersionGraphQLReturn = CommitRecord +export type VersionGraphQLReturn = Omit export type LimitedUserGraphQLReturn = Omit< LimitedUser, diff --git a/packages/server/modules/core/repositories/branches.ts b/packages/server/modules/core/repositories/branches.ts index aec99e667..52aa58a4b 100644 --- a/packages/server/modules/core/repositories/branches.ts +++ b/packages/server/modules/core/repositories/branches.ts @@ -623,7 +623,7 @@ export const getModelTreeItemsFactory = options ) - const finalQuery = knex.from(query.as('sq1')) + const finalQuery = deps.db.from(query.as('sq1')) finalQuery.limit(limit) if (args.cursor) { diff --git a/packages/server/modules/core/repositories/commits.ts b/packages/server/modules/core/repositories/commits.ts index 771a9d925..ad357c929 100644 --- a/packages/server/modules/core/repositories/commits.ts +++ b/packages/server/modules/core/repositories/commits.ts @@ -269,13 +269,16 @@ export const getSpecificBranchCommitsFactory = const q = tables .commits(deps.db) - .select>([ + .select>>([ ...Commits.cols, - BranchCommits.col.branchId + knex.raw(`(array_agg(??))[1] as "branchId"`, [BranchCommits.col.branchId]), + knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamCommits.col.streamId]) ]) .innerJoin(BranchCommits.name, BranchCommits.col.commitId, Commits.col.id) + .innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id) .whereIn(Commits.col.id, commitIds) .whereIn(BranchCommits.col.branchId, branchIds) + .groupBy(Commits.col.id) const queryResults = await q const results: Array = [] @@ -294,13 +297,20 @@ export const getSpecificBranchCommitsFactory = const getPaginatedBranchCommitsBaseQueryFactory = (deps: { db: Knex }) => - (params: PaginatedBranchCommitsBaseParams) => { + []>( + params: PaginatedBranchCommitsBaseParams + ) => { const { branchId, filter } = params const q = tables .commits(deps.db) - .select(Commits.cols) + .select([ + ...Commits.cols, + knex.raw(`(array_agg(??))[1] as "branchId"`, [BranchCommits.col.branchId]), + knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamCommits.col.streamId]) + ]) .innerJoin(BranchCommits.name, BranchCommits.col.commitId, Commits.col.id) + .innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id) .innerJoin(Branches.name, Branches.col.id, BranchCommits.col.branchId) .where(Branches.col.id, branchId) .groupBy(Commits.col.id) @@ -338,7 +348,7 @@ export const getBranchCommitsTotalCountFactory = (deps: { db: Knex }): GetBranchCommitsTotalCount => async (params: PaginatedBranchCommitsBaseParams) => { const baseQ = getPaginatedBranchCommitsBaseQueryFactory(deps)(params) - const q = knex.count<{ count: string }[]>().from(baseQ.as('sq1')) + const q = deps.db.count<{ count: string }[]>().from(baseQ.as('sq1')) const [res] = await q return parseInt(res?.count || '0') diff --git a/packages/server/modules/core/services/commit/management.ts b/packages/server/modules/core/services/commit/management.ts index 9d627b1b6..7e179f57f 100644 --- a/packages/server/modules/core/services/commit/management.ts +++ b/packages/server/modules/core/services/commit/management.ts @@ -47,7 +47,7 @@ import { MarkReceivedVersionInput, UpdateVersionInput } from '@/modules/core/graph/generated/graphql' -import { CommitRecord } from '@/modules/core/helpers/types' +import { BranchRecord, CommitRecord } from '@/modules/core/helpers/types' import { getCommitFactory } from '@/modules/core/repositories/commits' import { ensureError, Roles } from '@speckle/shared' import { has } from 'lodash' @@ -172,7 +172,7 @@ export const createCommitByBranchIdFactory = : []) ]) - return commit + return { ...commit, streamId, branchId } } export const createCommitByBranchNameFactory = @@ -288,21 +288,20 @@ export const updateCommitAndNotifyFactory = ) } + let branch: BranchRecord | undefined = await deps.getCommitBranch(commitId) if (newBranchName) { try { - const [newBranch, oldBranch] = await Promise.all([ - deps.getStreamBranchByName(streamId, newBranchName), - deps.getCommitBranch(commitId) - ]) + const newBranch = await deps.getStreamBranchByName(streamId, newBranchName) - if (!newBranch || !oldBranch) { + if (!newBranch || !branch) { throw new Error("Couldn't resolve branch") } if (!commit) { throw new Error("Couldn't find commit") } - await deps.switchCommitBranch(commitId, newBranch.id, oldBranch.id) + await deps.switchCommitBranch(commitId, newBranch.id, branch.id) + branch = newBranch } catch (e) { throw new CommitUpdateError('Failed to update commit branch', { cause: ensureError(e), @@ -326,13 +325,14 @@ export const updateCommitAndNotifyFactory = newCommit }) - await Promise.all([ - deps.markCommitStreamUpdated(commit.id), - deps.markCommitBranchUpdated(commit.id) + const [updatedBranch] = await Promise.all([ + deps.markCommitBranchUpdated(commit.id), + deps.markCommitStreamUpdated(commit.id) ]) + branch = updatedBranch } - return newCommit + return { ...newCommit, streamId: stream.id, branchId: branch!.id } } export const deleteCommitAndNotifyFactory = From 2ad6b02db9f5a55d61c6d2d61f0d31fc4b56032c Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Fri, 8 Nov 2024 17:41:34 +0100 Subject: [PATCH 02/10] chore(multiregion): fix project resolver --- packages/server/modules/core/graph/resolvers/projects.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/server/modules/core/graph/resolvers/projects.ts b/packages/server/modules/core/graph/resolvers/projects.ts index aba6bc2cf..8faecf564 100644 --- a/packages/server/modules/core/graph/resolvers/projects.ts +++ b/packages/server/modules/core/graph/resolvers/projects.ts @@ -354,7 +354,12 @@ export = { })) }, async sourceApps(parent, _args, ctx) { - return ctx.loaders.streams.getSourceApps.load(parent.id) || [] + const projectDB = await getProjectDbClient({ projectId: parent.id }) + return ( + ctx.loaders + .forRegion({ db: projectDB }) + .streams.getSourceApps.load(parent.id) || [] + ) }, async visibility(parent) { From 7fb3a97ee5fef0cfdca782f8c2a6979fcf822976 Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Fri, 8 Nov 2024 17:42:11 +0100 Subject: [PATCH 03/10] chore(multiregion): initial work for models, versions and objects --- .../modules/core/graph/resolvers/models.ts | 204 +++++++++++------- .../modules/core/graph/resolvers/objects.ts | 28 ++- .../modules/core/graph/resolvers/versions.ts | 15 +- 3 files changed, 155 insertions(+), 92 deletions(-) diff --git a/packages/server/modules/core/graph/resolvers/models.ts b/packages/server/modules/core/graph/resolvers/models.ts index 3108812cb..bef995d3a 100644 --- a/packages/server/modules/core/graph/resolvers/models.ts +++ b/packages/server/modules/core/graph/resolvers/models.ts @@ -61,72 +61,7 @@ import { } from '@/modules/core/repositories/streams' import { ModelsEmitter } from '@/modules/core/events/modelsEmitter' import { saveActivityFactory } from '@/modules/activitystream/repositories' - -const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db }) -const getStream = getStreamFactory({ db }) -const getStreamObjects = getStreamObjectsFactory({ db }) -const getViewerResourceGroups = getViewerResourceGroupsFactory({ - getStreamObjects, - getBranchLatestCommits: getBranchLatestCommitsFactory({ db }), - getStreamBranchesByName: getStreamBranchesByNameFactory({ db }), - getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }), - getAllBranchCommits: getAllBranchCommitsFactory({ db }) -}) - -const getPaginatedProjectModels = getPaginatedProjectModelsFactory({ - getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({ db }), - getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory({ - db - }) -}) -const getModelTreeItems = getModelTreeItemsFactory({ db }) -const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({ - getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db }), - getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory({ - db - }), - getModelTreeItems, - getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({ db }) -}) -const createBranchAndNotify = createBranchAndNotifyFactory({ - getStreamBranchByName: getStreamBranchByNameFactory({ db }), - createBranch: createBranchFactory({ db }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) -const updateBranchAndNotify = updateBranchAndNotifyFactory({ - getBranchById: getBranchByIdFactory({ db }), - updateBranch: updateBranchFactory({ db }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) -const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ - getStream, - getBranchById: getBranchByIdFactory({ db }), - modelsEventsEmitter: ModelsEmitter.emit, - markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), - deleteBranchById: deleteBranchByIdFactory({ db }) -}) - -const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({ - getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }), - getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({ db }), - getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db }) -}) -const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({ - legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory({ - db - }), - getStreamCommitCount: getStreamCommitCountFactory({ db }) -}) +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' export = { User: { @@ -134,26 +69,45 @@ export = { const authoredOnly = args.authoredOnly return { totalCount: authoredOnly - ? await ctx.loaders.users.getAuthoredCommitCount.load(parent.id) + ? // TODO: make one dataloader for region and sum + await ctx.loaders + .forRegion({ db: regionDB }) + .users.getAuthoredCommitCount.load(parent.id) : await ctx.loaders.users.getStreamCommitCount.load(parent.id) } } }, Project: { async models(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) // If limit=0 & no filter, short-cut full execution and use data loader if (args.limit === 0 && !args.filter) { return { - totalCount: await ctx.loaders.streams.getBranchCount.load(parent.id), + totalCount: await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getBranchCount.load(parent.id), items: [], cursor: null } } + const getPaginatedProjectModels = getPaginatedProjectModelsFactory({ + getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({ + db: projectDB + }), + getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory( + { + db: projectDB + } + ) + }) return await getPaginatedProjectModels(parent.id, args) }, - async model(_parent, args, ctx) { - const model = await ctx.loaders.branches.getById.load(args.id) + async model(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const model = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getById.load(args.id) if (!model) { throw new BranchNotFoundError('Model not found') } @@ -161,8 +115,10 @@ export = { return model }, async modelByName(parent, args, ctx) { - const model = await ctx.loaders.streams.getStreamBranchByName - .forStream(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const model = await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamBranchByName.forStream(parent.id) .load(args.name) if (!model) { throw new BranchNotFoundError('Model not found') @@ -171,9 +127,25 @@ export = { return model }, async modelsTree(parent, args) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) + const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({ + getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db: projectDB }), + getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory( + { + db: projectDB + } + ), + getModelTreeItems, + getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({ + db: projectDB + }) + }) return await getProjectTopLevelModelsTree(parent.id, args) }, async modelChildrenTree(parent, { fullName }) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.id, {}, @@ -183,6 +155,15 @@ export = { ) }, async viewerResources(parent, { resourceIdString, loadedVersionsOnly }) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const getStreamObjects = getStreamObjectsFactory({ db: projectDB }) + const getViewerResourceGroups = getViewerResourceGroupsFactory({ + getStreamObjects, + getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDB }), + getStreamBranchesByName: getStreamBranchesByNameFactory({ db: projectDB }), + getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }), + getAllBranchCommits: getAllBranchCommitsFactory({ db: projectDB }) + }) return await getViewerResourceGroups({ projectId: parent.id, resourceIdString, @@ -190,6 +171,7 @@ export = { }) }, async versions(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.id }) // If limit=0, short-cut full execution and use data loader if (args.limit === 0) { return { @@ -201,6 +183,14 @@ export = { } } + const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({ + legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory( + { + db: projectDB + } + ), + getStreamCommitCount: getStreamCommitCountFactory({ db: projectDB }) + }) return await getPaginatedStreamCommits(parent.id, args) } }, @@ -209,11 +199,16 @@ export = { return await ctx.loaders.users.getUser.load(parent.authorId) }, async previewUrl(parent, _args, ctx) { - const latestCommit = await ctx.loaders.branches.getLatestCommit.load(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const latestCommit = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getLatestCommit.load(parent.id) const path = `/preview/${parent.streamId}/commits/${latestCommit?.id || ''}` return latestCommit ? new URL(path, getServerOrigin()).toString() : null }, async childrenTree(parent) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.streamId, {}, @@ -226,15 +221,25 @@ export = { return last(parent.name.split('/')) }, async versions(parent, args, ctx) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) // If limit=0 & no filter, short-cut full execution and use data loader if (!args.filter && args.limit === 0) { return { - totalCount: await ctx.loaders.branches.getCommitCount.load(parent.id), + totalCount: await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getCommitCount.load(parent.id), items: [], cursor: null } } + const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({ + getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }), + getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({ + db: projectDB + }), + getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db: projectDB }) + }) return await getPaginatedBranchCommits({ branchId: parent.id, cursor: args.cursor, @@ -243,10 +248,13 @@ export = { }) }, async version(parent, args, ctx) { - const version = await ctx.loaders.branches.getBranchCommit.load({ - branchId: parent.id, - commitId: args.id - }) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const version = await ctx.loaders + .forRegion({ db: projectDB }) + .branches.getBranchCommit.load({ + branchId: parent.id, + commitId: args.id + }) if (!version) { throw new CommitNotFoundError('Version not found') } @@ -256,11 +264,15 @@ export = { }, ModelsTreeItem: { async model(parent, _args, ctx) { - return await ctx.loaders.streams.getStreamBranchByName - .forStream(parent.projectId) + const projectDB = await getProjectDbClient({ projectId: parent.projectId }) + return await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamBranchByName.forStream(parent.projectId) .load(parent.fullName) }, async children(parent) { + const projectDB = await getProjectDbClient({ projectId: parent.projectId }) + const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB }) return await getModelTreeItems( parent.projectId, {}, @@ -281,6 +293,15 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const createBranchAndNotify = createBranchAndNotifyFactory({ + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDB }), + createBranch: createBranchFactory({ db: projectDB }), + addBranchCreatedActivity: addBranchCreatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) return await createBranchAndNotify(args.input, ctx.userId!) }, async update(_parent, args, ctx) { @@ -290,6 +311,15 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const updateBranchAndNotify = updateBranchAndNotifyFactory({ + getBranchById: getBranchByIdFactory({ db: projectDB }), + updateBranch: updateBranchFactory({ db: projectDB }), + addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) return await updateBranchAndNotify(args.input, ctx.userId!) }, async delete(_parent, args, ctx) { @@ -299,6 +329,20 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + const projectDB = await getProjectDbClient({ projectId: args.input.projectId }) + const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db: projectDB }) + const getStream = getStreamFactory({ db }) + const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ + getStream, + getBranchById: getBranchByIdFactory({ db: projectDB }), + modelsEventsEmitter: ModelsEmitter.emit, + markBranchStreamUpdated, + addBranchDeletedActivity: addBranchDeletedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }), + deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) + }) return await deleteBranchAndNotify(args.input, ctx.userId!) } }, diff --git a/packages/server/modules/core/graph/resolvers/objects.ts b/packages/server/modules/core/graph/resolvers/objects.ts index ee0b80599..90b963d08 100644 --- a/packages/server/modules/core/graph/resolvers/objects.ts +++ b/packages/server/modules/core/graph/resolvers/objects.ts @@ -10,19 +10,19 @@ import { } from '@/modules/core/repositories/objects' import { db } from '@/db/knex' import { createObjectsFactory } from '@/modules/core/services/objects/management' +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' -const getObject = getObjectFactory({ db }) -const createObjects = createObjectsFactory({ - storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }), - storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db }) -}) -const getObjectChildren = getObjectChildrenFactory({ db }) -const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db }) -type GetObjectChildrenQueryParams = Parameters[0] +type GetObjectChildrenQueryParams = Parameters< + ReturnType +>[0] const getStreamObject: NonNullable['object'] = async function object(parent, args) { - return (await getObject(args.id, parent.id)) || null + return ( + (await getObjectFactory({ + db: await getProjectDbClient({ projectId: parent.id }) + })(args.id, parent.id)) || null + ) } export = { @@ -34,8 +34,10 @@ export = { }, Object: { async children(parent, args) { + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) // The simple query branch if (!args.query && !args.orderBy) { + const getObjectChildren = getObjectChildrenFactory({ db }) const result = await getObjectChildren({ streamId: parent.streamId, objectId: parent.id, @@ -60,6 +62,7 @@ export = { } } + const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db: projectDB }) // The complex query branch const result = await getObjectChildrenQuery({ streamId: parent.streamId, @@ -98,6 +101,13 @@ export = { context.resourceAccessRules ) + const projectDB = await getProjectDbClient({ + projectId: args.objectInput.streamId + }) + const createObjects = createObjectsFactory({ + storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db: projectDB }), + storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db: projectDB }) + }) const ids = await createObjects({ streamId: args.objectInput.streamId, objects: args.objectInput.objects.filter(isNonNullable) diff --git a/packages/server/modules/core/graph/resolvers/versions.ts b/packages/server/modules/core/graph/resolvers/versions.ts index 19c7f0d75..7366f43f7 100644 --- a/packages/server/modules/core/graph/resolvers/versions.ts +++ b/packages/server/modules/core/graph/resolvers/versions.ts @@ -56,6 +56,7 @@ import { } from '@/modules/activitystream/services/commitActivity' import { getObjectFactory } from '@/modules/core/repositories/objects' import { saveActivityFactory } from '@/modules/activitystream/repositories' +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db }) const getCommitStream = getCommitStreamFactory({ db }) @@ -117,8 +118,10 @@ const batchDeleteCommits = batchDeleteCommitsFactory({ export = { Project: { async version(parent, args, ctx) { - const version = await ctx.loaders.streams.getStreamCommit - .forStream(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.id }) + const version = await ctx.loaders + .forRegion({ db: projectDB }) + .streams.getStreamCommit.forStream(parent.id) .load(args.id) if (!version) { throw new CommitNotFoundError('Version not found') @@ -134,7 +137,10 @@ export = { return (await ctx.loaders.users.getUser.load(author)) || null }, async model(parent, _args, ctx) { - return await ctx.loaders.commits.getCommitBranch.load(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + return await ctx.loaders + .forRegion({ db: projectDB }) + .commits.getCommitBranch.load(parent.id) }, async previewUrl(parent, _args, ctx) { const stream = await ctx.loaders.commits.getCommitStream.load(parent.id) @@ -147,13 +153,16 @@ export = { }, VersionMutations: { async moveToModel(_parent, args, ctx) { + // TODO: how to get streamId here? return await batchMoveCommits(args.input, ctx.userId!) }, async delete(_parent, args, ctx) { + // TODO: how to get streamId here? await batchDeleteCommits(args.input, ctx.userId!) return true }, async update(_parent, args, ctx) { + // TODO: how to get streamId here? const stream = await ctx.loaders.commits.getCommitStream.load( args.input.versionId ) From 1c19f67dd2d451aedcc4a74836cf01c662c52d0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= Date: Mon, 11 Nov 2024 06:09:53 +0100 Subject: [PATCH 04/10] feat(models): support streamId, branchId in models --- .../activitystream/services/commitActivity.ts | 10 +++---- .../modules/core/domain/commits/operations.ts | 16 +++++----- .../modules/core/domain/commits/types.ts | 8 ++++- .../modules/core/graph/dataloaders/index.ts | 7 +++-- .../modules/core/graph/resolvers/models.ts | 29 ++++++++++++++----- .../server/modules/core/helpers/graphTypes.ts | 4 +-- .../modules/core/repositories/commits.ts | 11 ++++--- .../modules/shared/utils/subscriptions.ts | 2 +- 8 files changed, 55 insertions(+), 32 deletions(-) diff --git a/packages/server/modules/activitystream/services/commitActivity.ts b/packages/server/modules/activitystream/services/commitActivity.ts index bb880a46e..026db3560 100644 --- a/packages/server/modules/activitystream/services/commitActivity.ts +++ b/packages/server/modules/activitystream/services/commitActivity.ts @@ -40,7 +40,7 @@ export const addCommitCreatedActivityFactory = modelId: string commit: CommitRecord }) => { - const { commitId, input, streamId, userId, branchName, commit } = params + const { commitId, input, streamId, userId, branchName, commit, modelId } = params await Promise.all([ saveActivity({ streamId, @@ -53,7 +53,7 @@ export const addCommitCreatedActivityFactory = commit: { ...input, projectId: streamId, - modelId: params.modelId, + modelId, versionId: commit.id } }, @@ -67,7 +67,7 @@ export const addCommitCreatedActivityFactory = projectId: streamId, projectVersionsUpdated: { id: commit.id, - version: commit, + version: { ...commit, streamId }, type: ProjectVersionsUpdatedMessageType.Created, modelId: null } @@ -123,7 +123,7 @@ export const addCommitUpdatedActivityFactory = projectId: streamId, projectVersionsUpdated: { id: commitId, - version: newCommit, + version: { ...newCommit, streamId }, type: ProjectVersionsUpdatedMessageType.Updated, modelId: null } @@ -162,7 +162,7 @@ export const addCommitMovedActivityFactory = projectId: streamId, projectVersionsUpdated: { id: commitId, - version: commit, + version: { ...commit, streamId }, type: ProjectVersionsUpdatedMessageType.Updated, modelId: null } diff --git a/packages/server/modules/core/domain/commits/operations.ts b/packages/server/modules/core/domain/commits/operations.ts index 56f4de478..589561d83 100644 --- a/packages/server/modules/core/domain/commits/operations.ts +++ b/packages/server/modules/core/domain/commits/operations.ts @@ -1,12 +1,12 @@ import { Branch } from '@/modules/core/domain/branches/types' import { - CommitWithBranchId, CommitWithStreamBranchMetadata, Commit, CommitBranch, CommitWithStreamId, LegacyUserCommit, - LegacyStreamCommit + LegacyStreamCommit, + CommitWithStreamBranchId } from '@/modules/core/domain/commits/types' import { CommitsDeleteInput, @@ -54,7 +54,7 @@ export type GetSpecificBranchCommits = ( branchId: string commitId: string }[] -) => Promise +) => Promise export type StoreCommit = ( params: Omit, 'id' | 'createdAt'> @@ -74,7 +74,7 @@ export type CreateCommitByBranchId = ( options?: Partial<{ notify: boolean }> -) => Promise> +) => Promise export type CreateCommitByBranchName = ( params: NullableKeysToOptional<{ @@ -109,7 +109,7 @@ export type InsertStreamCommits = ( export type UpdateCommitAndNotify = ( params: CommitUpdateInput | UpdateVersionInput, userId: string -) => Promise> +) => Promise export type GetCommitBranches = (commitIds: string[]) => Promise @@ -166,7 +166,7 @@ export type GetUserAuthoredCommitCounts = (params: { export type GetCommitsAndTheirBranchIds = ( commitIds: string[] -) => Promise +) => Promise export type GetBatchedStreamCommits = ( streamId: string, @@ -203,7 +203,7 @@ export type PaginatedBranchCommitsParams = PaginatedBranchCommitsBaseParams & { export type GetPaginatedBranchCommitsItems = ( params: PaginatedBranchCommitsParams ) => Promise<{ - commits: Commit[] + commits: CommitWithStreamBranchId[] cursor: string | null }> @@ -217,7 +217,7 @@ export type GetPaginatedBranchCommits = ( } ) => Promise<{ totalCount: number - items: Commit[] + items: CommitWithStreamBranchId[] cursor: string | null }> diff --git a/packages/server/modules/core/domain/commits/types.ts b/packages/server/modules/core/domain/commits/types.ts index c32d4d9c7..158e8a478 100644 --- a/packages/server/modules/core/domain/commits/types.ts +++ b/packages/server/modules/core/domain/commits/types.ts @@ -14,9 +14,12 @@ export type CommitWithBranchId = Commit & { export type CommitWithStreamId = Commit & { streamId: string } export type BranchLatestCommit = CommitWithBranchId -export type CommitWithStreamBranchMetadata = Commit & { +export type CommitWithStreamBranchId = Commit & { streamId: string branchId: string +} + +export type CommitWithStreamBranchMetadata = CommitWithStreamBranchId & { branchName: string } @@ -31,6 +34,7 @@ export type LegacyUserCommit = { parents: CommitRecord['parents'] createdAt: CommitRecord['createdAt'] branchName: BranchRecord['name'] + branchId: BranchRecord['id'] streamId: StreamCommitRecord['streamId'] streamName: StreamRecord['name'] authorName: UserRecord['name'] @@ -47,6 +51,8 @@ export type LegacyStreamCommit = { parents: CommitRecord['parents'] createdAt: CommitRecord['createdAt'] branchName: BranchRecord['name'] + branchId: BranchRecord['id'] + streamId: StreamCommitRecord['streamId'] authorName: UserRecord['name'] authorId: UserRecord['id'] authorAvatar: UserRecord['avatar'] diff --git a/packages/server/modules/core/graph/dataloaders/index.ts b/packages/server/modules/core/graph/dataloaders/index.ts index 0db7d5b90..9976276ef 100644 --- a/packages/server/modules/core/graph/dataloaders/index.ts +++ b/packages/server/modules/core/graph/dataloaders/index.ts @@ -89,7 +89,10 @@ import { getUsersFactory, UserWithOptionalRole } from '@/modules/core/repositories/users' -import { CommitWithStreamBranchMetadata } from '@/modules/core/domain/commits/types' +import { + CommitWithStreamBranchId, + CommitWithStreamBranchMetadata +} from '@/modules/core/domain/commits/types' declare module '@/modules/core/loaders' { interface ModularizedDataLoaders extends ReturnType {} @@ -369,7 +372,7 @@ const dataLoadersDefinition = defineRequestDataloaders( }), getBranchCommit: createLoader< { branchId: string; commitId: string }, - Nullable, + Nullable, string >( async (idPairs) => { diff --git a/packages/server/modules/core/graph/resolvers/models.ts b/packages/server/modules/core/graph/resolvers/models.ts index bef995d3a..5e764b035 100644 --- a/packages/server/modules/core/graph/resolvers/models.ts +++ b/packages/server/modules/core/graph/resolvers/models.ts @@ -61,19 +61,34 @@ import { } from '@/modules/core/repositories/streams' import { ModelsEmitter } from '@/modules/core/events/modelsEmitter' import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { getProjectDbClient } from '@/modules/multiregion/dbSelector' +import { + getProjectDbClient, + getRegisteredRegionClients +} from '@/modules/multiregion/dbSelector' export = { User: { async versions(parent, args, ctx) { const authoredOnly = args.authoredOnly + const regionClients = await getRegisteredRegionClients() + const allLoaders = [ + ctx.loaders, + ...Object.values(regionClients).map((db) => ctx.loaders.forRegion({ db })) + ] + let counts: number[] + if (authoredOnly) { + counts = await Promise.all( + allLoaders.map((loader) => + loader.users.getAuthoredCommitCount.load(parent.id) + ) + ) + } else { + counts = await Promise.all( + allLoaders.map((loader) => loader.users.getStreamCommitCount.load(parent.id)) + ) + } return { - totalCount: authoredOnly - ? // TODO: make one dataloader for region and sum - await ctx.loaders - .forRegion({ db: regionDB }) - .users.getAuthoredCommitCount.load(parent.id) - : await ctx.loaders.users.getStreamCommitCount.load(parent.id) + totalCount: counts.reduce((acc, curr) => acc + curr, 0) } } }, diff --git a/packages/server/modules/core/helpers/graphTypes.ts b/packages/server/modules/core/helpers/graphTypes.ts index 4e0b0507c..e0dd6551b 100644 --- a/packages/server/modules/core/helpers/graphTypes.ts +++ b/packages/server/modules/core/helpers/graphTypes.ts @@ -1,5 +1,5 @@ import { - CommitWithStreamBranchMetadata, + CommitWithStreamBranchId, LegacyStreamCommit, LegacyUserCommit } from '@/modules/core/domain/commits/types' @@ -44,7 +44,7 @@ export type ProjectGraphQLReturn = StreamGraphQLReturn export type ModelGraphQLReturn = BranchRecord -export type VersionGraphQLReturn = Omit +export type VersionGraphQLReturn = CommitWithStreamBranchId export type LimitedUserGraphQLReturn = Omit< LimitedUser, diff --git a/packages/server/modules/core/repositories/commits.ts b/packages/server/modules/core/repositories/commits.ts index ad357c929..3d73c7d2c 100644 --- a/packages/server/modules/core/repositories/commits.ts +++ b/packages/server/modules/core/repositories/commits.ts @@ -23,6 +23,7 @@ import { import { Knex } from 'knex' import { MaybeNullOrUndefined, Optional } from '@speckle/shared' import { + CommitWithStreamBranchId, CommitWithStreamBranchMetadata, LegacyStreamCommit, LegacyUserCommit @@ -251,7 +252,7 @@ export const getCommitsAndTheirBranchIdsFactory = return await tables .commits(deps.db) - .select>([ + .select>([ ...Commits.cols, BranchCommits.col.branchId ]) @@ -269,7 +270,7 @@ export const getSpecificBranchCommitsFactory = const q = tables .commits(deps.db) - .select>>([ + .select>([ ...Commits.cols, knex.raw(`(array_agg(??))[1] as "branchId"`, [BranchCommits.col.branchId]), knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamCommits.col.streamId]) @@ -281,7 +282,7 @@ export const getSpecificBranchCommitsFactory = .groupBy(Commits.col.id) const queryResults = await q - const results: Array = [] + const results: Array = [] for (const pair of pairs) { const commit = queryResults.find( @@ -297,9 +298,7 @@ export const getSpecificBranchCommitsFactory = const getPaginatedBranchCommitsBaseQueryFactory = (deps: { db: Knex }) => - []>( - params: PaginatedBranchCommitsBaseParams - ) => { + (params: PaginatedBranchCommitsBaseParams) => { const { branchId, filter } = params const q = tables diff --git a/packages/server/modules/shared/utils/subscriptions.ts b/packages/server/modules/shared/utils/subscriptions.ts index 41ce47789..a2c7f3e54 100644 --- a/packages/server/modules/shared/utils/subscriptions.ts +++ b/packages/server/modules/shared/utils/subscriptions.ts @@ -183,7 +183,7 @@ type SubscriptionTypeMap = { payload: { projectVersionsUpdated: Merge< ProjectVersionsUpdatedMessage, - { version: Nullable } + { version: Nullable> } > projectId: string } From 153b1c3802f3f67de4c673e126bd79c3ddd3bc66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= Date: Mon, 11 Nov 2024 09:57:22 +0100 Subject: [PATCH 05/10] feat(version): require project id in version mutations --- .../core/typedefs/modelsAndVersions.graphql | 9 +- .../activitystream/services/commitActivity.ts | 20 --- .../modules/core/graph/generated/graphql.ts | 9 +- .../modules/core/graph/resolvers/versions.ts | 143 ++++++++++-------- .../core/services/commit/management.ts | 74 ++++----- .../graph/generated/graphql.ts | 9 +- .../server/test/graphql/generated/graphql.ts | 9 +- 7 files changed, 143 insertions(+), 130 deletions(-) diff --git a/packages/server/assets/core/typedefs/modelsAndVersions.graphql b/packages/server/assets/core/typedefs/modelsAndVersions.graphql index f03b38464..92dc0a362 100644 --- a/packages/server/assets/core/typedefs/modelsAndVersions.graphql +++ b/packages/server/assets/core/typedefs/modelsAndVersions.graphql @@ -160,7 +160,8 @@ type ModelMutations { } input MoveVersionsInput { - versionIds: [String!]! + projectId: ID! + versionIds: [ID!]! """ If the name references a nonexistant model, it will be created """ @@ -168,14 +169,16 @@ input MoveVersionsInput { } input DeleteVersionsInput { - versionIds: [String!]! + projectId: ID! + versionIds: [ID!]! } """ Only non-null values will be updated """ input UpdateVersionInput { - versionId: String! + projectId: ID! + versionId: ID! message: String } diff --git a/packages/server/modules/activitystream/services/commitActivity.ts b/packages/server/modules/activitystream/services/commitActivity.ts index 026db3560..af871501f 100644 --- a/packages/server/modules/activitystream/services/commitActivity.ts +++ b/packages/server/modules/activitystream/services/commitActivity.ts @@ -5,7 +5,6 @@ import { } from '@/modules/shared/utils/subscriptions' import { CommitCreateInput, - CommitReceivedInput, CommitUpdateInput, ProjectVersionsUpdatedMessageType, UpdateVersionInput @@ -211,22 +210,3 @@ export const addCommitDeletedActivityFactory = }) ]) } - -export const addCommitReceivedActivityFactory = - ({ saveActivity }: { saveActivity: SaveActivity }) => - async (params: { input: CommitReceivedInput; userId: string }) => { - const { input, userId } = params - - await saveActivity({ - streamId: input.streamId, - resourceType: ResourceTypes.Commit, - resourceId: input.commitId, - actionType: ActionTypes.Commit.Receive, - userId, - info: { - sourceApplication: input.sourceApplication, - message: input.message - }, - message: `Commit ${input.commitId} was received by user ${userId}` - }) - } diff --git a/packages/server/modules/core/graph/generated/graphql.ts b/packages/server/modules/core/graph/generated/graphql.ts index 90f6d80a3..dcbcf6980 100644 --- a/packages/server/modules/core/graph/generated/graphql.ts +++ b/packages/server/modules/core/graph/generated/graphql.ts @@ -889,7 +889,8 @@ export type DeleteUserEmailInput = { }; export type DeleteVersionsInput = { - versionIds: Array; + projectId: Scalars['ID']['input']; + versionIds: Array; }; export enum DiscoverableStreamsSortType { @@ -1222,9 +1223,10 @@ export type ModelsTreeItemCollection = { }; export type MoveVersionsInput = { + projectId: Scalars['ID']['input']; /** If the name references a nonexistant model, it will be created */ targetModelName: Scalars['String']['input']; - versionIds: Array; + versionIds: Array; }; export type Mutation = { @@ -3515,7 +3517,8 @@ export type UpdateServerRegionInput = { /** Only non-null values will be updated */ export type UpdateVersionInput = { message?: InputMaybe; - versionId: Scalars['String']['input']; + projectId: Scalars['ID']['input']; + versionId: Scalars['ID']['input']; }; /** diff --git a/packages/server/modules/core/graph/resolvers/versions.ts b/packages/server/modules/core/graph/resolvers/versions.ts index 7366f43f7..5fce4d17d 100644 --- a/packages/server/modules/core/graph/resolvers/versions.ts +++ b/packages/server/modules/core/graph/resolvers/versions.ts @@ -14,7 +14,7 @@ import { import { CommitNotFoundError, CommitUpdateError } from '@/modules/core/errors/commit' import { createCommitByBranchIdFactory, - markCommitReceivedAndNotify, + markCommitReceivedAndNotifyFactory, updateCommitAndNotifyFactory } from '@/modules/core/services/commit/management' import { @@ -58,63 +58,6 @@ import { getObjectFactory } from '@/modules/core/repositories/objects' import { saveActivityFactory } from '@/modules/activitystream/repositories' import { getProjectDbClient } from '@/modules/multiregion/dbSelector' -const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db }) -const getCommitStream = getCommitStreamFactory({ db }) -const getStream = getStreamFactory({ db }) -const getStreams = getStreamsFactory({ db }) -const getObject = getObjectFactory({ db }) -const createCommitByBranchId = createCommitByBranchIdFactory({ - createCommit: createCommitFactory({ db }), - getObject, - getBranchById: getBranchByIdFactory({ db }), - insertStreamCommits: insertStreamCommitsFactory({ db }), - insertBranchCommits: insertBranchCommitsFactory({ db }), - markCommitStreamUpdated, - markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db }), - versionsEventEmitter: VersionsEmitter.emit, - addCommitCreatedActivity: addCommitCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) - -const updateCommitAndNotify = updateCommitAndNotifyFactory({ - getCommit: getCommitFactory({ db }), - getStream, - getCommitStream, - getStreamBranchByName: getStreamBranchByNameFactory({ db }), - getCommitBranch: getCommitBranchFactory({ db }), - switchCommitBranch: switchCommitBranchFactory({ db }), - updateCommit: updateCommitFactory({ db }), - addCommitUpdatedActivity: addCommitUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), - markCommitStreamUpdated, - markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db }) -}) - -const batchMoveCommits = batchMoveCommitsFactory({ - getCommits: getCommitsFactory({ db }), - getStreams, - getStreamBranchByName: getStreamBranchByNameFactory({ db }), - createBranch: createBranchFactory({ db }), - moveCommitsToBranch: moveCommitsToBranchFactory({ db }), - addCommitMovedActivity: addCommitMovedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) -const batchDeleteCommits = batchDeleteCommitsFactory({ - getCommits: getCommitsFactory({ db }), - getStreams, - deleteCommits: deleteCommitsFactory({ db }), - addCommitDeletedActivity: addCommitDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) -}) - export = { Project: { async version(parent, args, ctx) { @@ -134,7 +77,11 @@ export = { async authorUser(parent, _args, ctx) { const { author } = parent if (!author) return null - return (await ctx.loaders.users.getUser.load(author)) || null + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + return ( + (await ctx.loaders.forRegion({ db: projectDB }).users.getUser.load(author)) || + null + ) }, async model(parent, _args, ctx) { const projectDB = await getProjectDbClient({ projectId: parent.streamId }) @@ -143,7 +90,10 @@ export = { .commits.getCommitBranch.load(parent.id) }, async previewUrl(parent, _args, ctx) { - const stream = await ctx.loaders.commits.getCommitStream.load(parent.id) + const projectDB = await getProjectDbClient({ projectId: parent.streamId }) + const stream = await ctx.loaders + .forRegion({ db: projectDB }) + .commits.getCommitStream.load(parent.id) const path = `/preview/${stream!.id}/commits/${parent.id}` return new URL(path, getServerOrigin()).toString() } @@ -154,18 +104,46 @@ export = { VersionMutations: { async moveToModel(_parent, args, ctx) { // TODO: how to get streamId here? + const projectId = args.input.projectId + const projectDb = await getProjectDbClient({ projectId }) + + const batchMoveCommits = batchMoveCommitsFactory({ + getCommits: getCommitsFactory({ db: projectDb }), + getStreams: getStreamsFactory({ db: projectDb }), + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), + createBranch: createBranchFactory({ db: projectDb }), + moveCommitsToBranch: moveCommitsToBranchFactory({ db: projectDb }), + addCommitMovedActivity: addCommitMovedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) return await batchMoveCommits(args.input, ctx.userId!) }, async delete(_parent, args, ctx) { // TODO: how to get streamId here? + const projectId = args.input.projectId + const projectDb = await getProjectDbClient({ projectId }) + + const batchDeleteCommits = batchDeleteCommitsFactory({ + getCommits: getCommitsFactory({ db: projectDb }), + getStreams: getStreamsFactory({ db: projectDb }), + deleteCommits: deleteCommitsFactory({ db: projectDb }), + addCommitDeletedActivity: addCommitDeletedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) await batchDeleteCommits(args.input, ctx.userId!) return true }, async update(_parent, args, ctx) { // TODO: how to get streamId here? - const stream = await ctx.loaders.commits.getCommitStream.load( - args.input.versionId - ) + const projectId = args.input.projectId + const projectDb = await getProjectDbClient({ projectId }) + const stream = await ctx.loaders + .forRegion({ db: projectDb }) + .commits.getCommitStream.load(args.input.versionId) if (!stream) { throw new CommitUpdateError('Commit stream not found') } @@ -176,6 +154,22 @@ export = { Roles.Stream.Contributor, ctx.resourceAccessRules ) + + const updateCommitAndNotify = updateCommitAndNotifyFactory({ + getCommit: getCommitFactory({ db: projectDb }), + getStream: getStreamFactory({ db: projectDb }), + getCommitStream: getCommitStreamFactory({ db: projectDb }), + getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), + getCommitBranch: getCommitBranchFactory({ db: projectDb }), + switchCommitBranch: switchCommitBranchFactory({ db: projectDb }), + updateCommit: updateCommitFactory({ db: projectDb }), + addCommitUpdatedActivity: addCommitUpdatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }), + markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db: projectDb }), + markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db: projectDb }) + }) return await updateCommitAndNotify(args.input, ctx.userId!) }, async create(_parent, args, ctx) { @@ -191,6 +185,23 @@ export = { throw new RateLimitError(rateLimitResult) } + const projectDb = await getProjectDbClient({ projectId: args.input.projectId }) + + const createCommitByBranchId = createCommitByBranchIdFactory({ + createCommit: createCommitFactory({ db: projectDb }), + getObject: getObjectFactory({ db: projectDb }), + getBranchById: getBranchByIdFactory({ db: projectDb }), + insertStreamCommits: insertStreamCommitsFactory({ db: projectDb }), + insertBranchCommits: insertBranchCommitsFactory({ db: projectDb }), + markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db: projectDb }), + markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db: projectDb }), + versionsEventEmitter: VersionsEmitter.emit, + addCommitCreatedActivity: addCommitCreatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + }) + }) + const commit = await createCommitByBranchId({ authorId: ctx.userId!, streamId: args.input.projectId, @@ -211,8 +222,12 @@ export = { Roles.Stream.Reviewer, ctx.resourceAccessRules ) + const projectDb = await getProjectDbClient({ projectId: args.input.projectId }) - await markCommitReceivedAndNotify({ + await markCommitReceivedAndNotifyFactory({ + getCommit: getCommitFactory({ db: projectDb }), + saveActivity: saveActivityFactory({ db }) + })({ input: args.input, userId: ctx.userId! }) diff --git a/packages/server/modules/core/services/commit/management.ts b/packages/server/modules/core/services/commit/management.ts index 7e179f57f..79c13e20b 100644 --- a/packages/server/modules/core/services/commit/management.ts +++ b/packages/server/modules/core/services/commit/management.ts @@ -1,11 +1,10 @@ -import { db } from '@/db/knex' import { AddCommitCreatedActivity, AddCommitDeletedActivity, - AddCommitUpdatedActivity + AddCommitUpdatedActivity, + SaveActivity } from '@/modules/activitystream/domain/operations' -import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { addCommitReceivedActivityFactory } from '@/modules/activitystream/services/commitActivity' +import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' import { GetBranchById, GetStreamBranchByName, @@ -48,42 +47,49 @@ import { UpdateVersionInput } from '@/modules/core/graph/generated/graphql' import { BranchRecord, CommitRecord } from '@/modules/core/helpers/types' -import { getCommitFactory } from '@/modules/core/repositories/commits' import { ensureError, Roles } from '@speckle/shared' import { has } from 'lodash' -export async function markCommitReceivedAndNotify(params: { - input: MarkReceivedVersionInput | CommitReceivedInput - userId: string -}) { - const { input, userId } = params +export const markCommitReceivedAndNotifyFactory = + ({ getCommit, saveActivity }: { getCommit: GetCommit; saveActivity: SaveActivity }) => + async (params: { + input: MarkReceivedVersionInput | CommitReceivedInput + userId: string + }) => { + const { input, userId } = params - const oldInput: CommitReceivedInput = - 'projectId' in input - ? { - ...input, - streamId: input.projectId, - commitId: input.versionId - } - : input + const oldInput: CommitReceivedInput = + 'projectId' in input + ? { + ...input, + streamId: input.projectId, + commitId: input.versionId + } + : input - const commit = await getCommitFactory({ db })(oldInput.commitId, { - streamId: oldInput.streamId - }) - if (!commit) { - throw new CommitReceiveError( - `Failed to find commit with id ${oldInput.commitId} in stream ${oldInput.streamId}.`, - { info: params } - ) - } - - await addCommitReceivedActivityFactory({ saveActivity: saveActivityFactory({ db }) })( - { - input: oldInput, - userId + const commit = await getCommit(oldInput.commitId, { + streamId: oldInput.streamId + }) + if (!commit) { + throw new CommitReceiveError( + `Failed to find commit with id ${oldInput.commitId} in stream ${oldInput.streamId}.`, + { info: params } + ) } - ) -} + + await saveActivity({ + streamId: oldInput.streamId, + resourceType: ResourceTypes.Commit, + resourceId: oldInput.commitId, + actionType: ActionTypes.Commit.Receive, + userId, + info: { + sourceApplication: input.sourceApplication, + message: input.message + }, + message: `Commit ${oldInput.commitId} was received by user ${userId}` + }) + } export const createCommitByBranchIdFactory = (deps: { diff --git a/packages/server/modules/cross-server-sync/graph/generated/graphql.ts b/packages/server/modules/cross-server-sync/graph/generated/graphql.ts index d96992662..d198006cf 100644 --- a/packages/server/modules/cross-server-sync/graph/generated/graphql.ts +++ b/packages/server/modules/cross-server-sync/graph/generated/graphql.ts @@ -870,7 +870,8 @@ export type DeleteUserEmailInput = { }; export type DeleteVersionsInput = { - versionIds: Array; + projectId: Scalars['ID']['input']; + versionIds: Array; }; export enum DiscoverableStreamsSortType { @@ -1203,9 +1204,10 @@ export type ModelsTreeItemCollection = { }; export type MoveVersionsInput = { + projectId: Scalars['ID']['input']; /** If the name references a nonexistant model, it will be created */ targetModelName: Scalars['String']['input']; - versionIds: Array; + versionIds: Array; }; export type Mutation = { @@ -3496,7 +3498,8 @@ export type UpdateServerRegionInput = { /** Only non-null values will be updated */ export type UpdateVersionInput = { message?: InputMaybe; - versionId: Scalars['String']['input']; + projectId: Scalars['ID']['input']; + versionId: Scalars['ID']['input']; }; /** diff --git a/packages/server/test/graphql/generated/graphql.ts b/packages/server/test/graphql/generated/graphql.ts index 855617e4a..7df2f5662 100644 --- a/packages/server/test/graphql/generated/graphql.ts +++ b/packages/server/test/graphql/generated/graphql.ts @@ -871,7 +871,8 @@ export type DeleteUserEmailInput = { }; export type DeleteVersionsInput = { - versionIds: Array; + projectId: Scalars['ID']['input']; + versionIds: Array; }; export enum DiscoverableStreamsSortType { @@ -1204,9 +1205,10 @@ export type ModelsTreeItemCollection = { }; export type MoveVersionsInput = { + projectId: Scalars['ID']['input']; /** If the name references a nonexistant model, it will be created */ targetModelName: Scalars['String']['input']; - versionIds: Array; + versionIds: Array; }; export type Mutation = { @@ -3497,7 +3499,8 @@ export type UpdateServerRegionInput = { /** Only non-null values will be updated */ export type UpdateVersionInput = { message?: InputMaybe; - versionId: Scalars['String']['input']; + projectId: Scalars['ID']['input']; + versionId: Scalars['ID']['input']; }; /** From ebce930b9ce39b43af37f8a338a6319bb283b8c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= Date: Mon, 11 Nov 2024 09:58:07 +0100 Subject: [PATCH 06/10] feat(frontend2): pass projectId props where needed --- .../project/model-page/dialog/Delete.vue | 4 +- .../project/model-page/dialog/EditMessage.vue | 2 + .../project/model-page/dialog/MoveTo.vue | 4 +- .../lib/common/generated/gql/graphql.ts | 9 ++-- .../projects/composables/versionManagement.ts | 41 ++++++++----------- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/packages/frontend-2/components/project/model-page/dialog/Delete.vue b/packages/frontend-2/components/project/model-page/dialog/Delete.vue index 27f6856eb..a2148afe4 100644 --- a/packages/frontend-2/components/project/model-page/dialog/Delete.vue +++ b/packages/frontend-2/components/project/model-page/dialog/Delete.vue @@ -53,7 +53,7 @@ const emit = defineEmits<{ const props = defineProps<{ versions: ProjectModelPageDialogDeleteVersionFragment[] open: boolean - projectId?: string + projectId: string modelId?: string }>() @@ -71,10 +71,10 @@ const onDelete = async () => { loading.value = true const success = await deleteVersions( { + projectId: props.projectId, versionIds: props.versions.map((v) => v.id) }, { - projectId: props.projectId, modelId: props.modelId } ) diff --git a/packages/frontend-2/components/project/model-page/dialog/EditMessage.vue b/packages/frontend-2/components/project/model-page/dialog/EditMessage.vue index 06453c1b1..8c595eb29 100644 --- a/packages/frontend-2/components/project/model-page/dialog/EditMessage.vue +++ b/packages/frontend-2/components/project/model-page/dialog/EditMessage.vue @@ -57,6 +57,7 @@ const emit = defineEmits<{ }>() const props = defineProps<{ + projectId: string version: Nullable open: boolean }>() @@ -85,6 +86,7 @@ const onSubmit = handleSubmit(async ({ newMessage }) => { loading.value = true const success = !!(await updateVersion({ + projectId: props.projectId, versionId: props.version?.id, message: newMessage })) diff --git a/packages/frontend-2/components/project/model-page/dialog/MoveTo.vue b/packages/frontend-2/components/project/model-page/dialog/MoveTo.vue index 4f16f2150..fa23e2a93 100644 --- a/packages/frontend-2/components/project/model-page/dialog/MoveTo.vue +++ b/packages/frontend-2/components/project/model-page/dialog/MoveTo.vue @@ -81,13 +81,13 @@ const onMove = async (targetModelName: string, newModelCreated?: boolean) => { loading.value = true const success = await moveVersions( { + projectId: props.projectId, versionIds: props.versions.map((v) => v.id), targetModelName }, { previousModelId: props.modelId, - newModelCreated, - projectId: props.projectId + newModelCreated } ) loading.value = false diff --git a/packages/frontend-2/lib/common/generated/gql/graphql.ts b/packages/frontend-2/lib/common/generated/gql/graphql.ts index d85390665..0aaa5c771 100644 --- a/packages/frontend-2/lib/common/generated/gql/graphql.ts +++ b/packages/frontend-2/lib/common/generated/gql/graphql.ts @@ -867,7 +867,8 @@ export type DeleteUserEmailInput = { }; export type DeleteVersionsInput = { - versionIds: Array; + projectId: Scalars['ID']['input']; + versionIds: Array; }; export enum DiscoverableStreamsSortType { @@ -1200,9 +1201,10 @@ export type ModelsTreeItemCollection = { }; export type MoveVersionsInput = { + projectId: Scalars['ID']['input']; /** If the name references a nonexistant model, it will be created */ targetModelName: Scalars['String']['input']; - versionIds: Array; + versionIds: Array; }; export type Mutation = { @@ -3493,7 +3495,8 @@ export type UpdateServerRegionInput = { /** Only non-null values will be updated */ export type UpdateVersionInput = { message?: InputMaybe; - versionId: Scalars['String']['input']; + projectId: Scalars['ID']['input']; + versionId: Scalars['ID']['input']; }; /** diff --git a/packages/frontend-2/lib/projects/composables/versionManagement.ts b/packages/frontend-2/lib/projects/composables/versionManagement.ts index 89ac33c7c..2171af690 100644 --- a/packages/frontend-2/lib/projects/composables/versionManagement.ts +++ b/packages/frontend-2/lib/projects/composables/versionManagement.ts @@ -350,7 +350,6 @@ export function useDeleteVersions() { * Various options for better cache updates, set if possible */ options?: Partial<{ - projectId: string modelId: string }> ) => { @@ -372,26 +371,21 @@ export function useDeleteVersions() { } // Update totalCounts in project - if (options?.projectId) { - modifyObjectFields( - cache, - getCacheId('Project', options.projectId), - (_fieldName, _variables, data) => { - return { - ...data, - ...(!isUndefined(data.totalCount) - ? { - totalCount: Math.max( - data.totalCount - input.versionIds.length, - 0 - ) - } - : {}) - } - }, - { fieldNameWhitelist: ['versions'] } - ) - } + modifyObjectFields( + cache, + getCacheId('Project', input.projectId), + (_fieldName, _variables, data) => { + return { + ...data, + ...(!isUndefined(data.totalCount) + ? { + totalCount: Math.max(data.totalCount - input.versionIds.length, 0) + } + : {}) + } + }, + { fieldNameWhitelist: ['versions'] } + ) // Update totalCounts in model if (options?.modelId) { @@ -458,7 +452,6 @@ export function useMoveVersions() { options?: Partial<{ previousModelId: string newModelCreated: boolean - projectId: string }> ) => { if (!input.versionIds.length || !input.targetModelName.trim()) return @@ -551,8 +544,8 @@ export function useMoveVersions() { { fieldNameWhitelist: ['versions'] } ) - if (options?.newModelCreated && options?.projectId) { - evictProjectModels(options.projectId) + if (options?.newModelCreated) { + evictProjectModels(input.projectId) } } }) From 76367c30c2bd023fe9008f14ff12ace676d2887a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= Date: Mon, 11 Nov 2024 10:23:35 +0100 Subject: [PATCH 07/10] feat(commits): proper dep initialization --- packages/server/modules/core/graph/resolvers/commits.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/server/modules/core/graph/resolvers/commits.ts b/packages/server/modules/core/graph/resolvers/commits.ts index edb359e67..fbd61bfb6 100644 --- a/packages/server/modules/core/graph/resolvers/commits.ts +++ b/packages/server/modules/core/graph/resolvers/commits.ts @@ -11,7 +11,7 @@ import { legacyGetPaginatedStreamCommitsFactory } from '@/modules/core/services/commit/retrieval' import { - markCommitReceivedAndNotify, + markCommitReceivedAndNotifyFactory, deleteCommitAndNotifyFactory, createCommitByBranchIdFactory, createCommitByBranchNameFactory, @@ -352,7 +352,10 @@ export = { context.resourceAccessRules ) - await markCommitReceivedAndNotify({ + await markCommitReceivedAndNotifyFactory({ + getCommit: getCommitFactory({ db }), + saveActivity: saveActivityFactory({ db }) + })({ input: args.input, userId: context.userId! }) From fcb8dd275a0f000743b1f92167ae1d2e336b7c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= Date: Mon, 11 Nov 2024 10:40:28 +0100 Subject: [PATCH 08/10] fix(automate): some gql generation mixup --- .../server/modules/automate/graph/resolvers/automate.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/server/modules/automate/graph/resolvers/automate.ts b/packages/server/modules/automate/graph/resolvers/automate.ts index b0c1b2ab7..07db8a6e7 100644 --- a/packages/server/modules/automate/graph/resolvers/automate.ts +++ b/packages/server/modules/automate/graph/resolvers/automate.ts @@ -855,10 +855,9 @@ export = (FF_AUTOMATE_MODULE_ENABLED } }, User: { - automateInfo: () => ({ - hasAutomateGithubApp: false, - availableGithubOrgs: [] - }) + automateInfo: () => { + throw new AutomateApiDisabledError() + } }, ServerInfo: { automate: () => ({ From 603861197472e4d7555b4589a0c350f48930989e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= <57442769+gjedlicska@users.noreply.github.com> Date: Mon, 11 Nov 2024 17:10:29 +0100 Subject: [PATCH 09/10] feat(webhook-service): learn to speak multi region (#3473) * feat(webhook-service): learn to speak multi region * refactor(webhook-service): remove unnecesary factories * docs(activities): brain dump * fix(shared): need to add knex as a dev dep * fix(shared): align dev dep --- packages/server/knexfile.ts | 72 ++---- .../activitystream/repositories/index.ts | 11 +- .../server/modules/multiregion/dbSelector.ts | 31 +-- .../modules/multiregion/domain/operations.ts | 4 +- .../modules/multiregion/domain/types.ts | 15 +- .../modules/multiregion/helpers/validation.ts | 34 --- .../modules/multiregion/regionConfig.ts | 58 ++--- .../tests/e2e/serverAdmin.graph.spec.ts | 4 +- packages/shared/package.json | 2 + .../src/environment/multiRegionConfig.ts | 154 +++++++++++++ packages/webhook-service/.vscode/launch.json | 16 ++ packages/webhook-service/src/knex.js | 63 ++++-- packages/webhook-service/src/main.js | 212 +++++++++--------- .../src/observability/prometheusMetrics.js | 9 +- yarn.lock | 2 + 15 files changed, 394 insertions(+), 293 deletions(-) delete mode 100644 packages/server/modules/multiregion/helpers/validation.ts create mode 100644 packages/shared/src/environment/multiRegionConfig.ts create mode 100644 packages/webhook-service/.vscode/launch.json diff --git a/packages/server/knexfile.ts b/packages/server/knexfile.ts index 37ab1320e..1da749fc1 100644 --- a/packages/server/knexfile.ts +++ b/packages/server/knexfile.ts @@ -1,5 +1,4 @@ /* eslint-disable no-restricted-imports */ -/* eslint-disable camelcase */ /* istanbul ignore file */ import { packageRoot } from './bootstrap' import fs from 'fs' @@ -10,8 +9,14 @@ import { postgresMaxConnections, isDevOrTestEnv } from '@/modules/shared/helpers/envHelper' -import { dbLogger as logger } from './logging/logging' +import { dbLogger as logger } from '@/logging/logging' import { Knex } from 'knex' +import { + createKnexConfig, + configureKnexClient, + KnexConfigArgs, + RegionServerConfig +} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js' function walk(dir: string) { let results: string[] = [] @@ -69,67 +74,38 @@ if (env.POSTGRES_USER && env.POSTGRES_PASSWORD) { // this is why the new datetime columns are created like this // table.specificType('createdAt', 'TIMESTAMPTZ(3)').defaultTo(knex.fn.now()) -export const createKnexConfig = ({ - connectionString, - caCertificate -}: { - connectionString?: string - caCertificate?: string | undefined -}): Knex.Config => { - return { - client: 'pg', - migrations: { - extension: 'ts', - loadExtensions: isTestEnv() ? ['.js', '.ts'] : ['.js'], - directory: migrationDirs - }, - log: { - warn(message: unknown) { - logger.warn(message) - }, - error(message: unknown) { - logger.error(message) - }, - deprecate(message: unknown) { - logger.info(message) - }, - debug(message: unknown) { - logger.debug(message) - } - }, - connection: { - connectionString, - ssl: caCertificate ? { ca: caCertificate, rejectUnauthorized: true } : undefined, - application_name: 'speckle_server' - }, - // we wish to avoid leaking sql queries in the logs: https://knexjs.org/guide/#compilesqlonerror - compileSqlOnError: isDevOrTestEnv(), - asyncStackTraces: isDevOrTestEnv(), - pool: { - min: 0, - max: postgresMaxConnections(), - acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts - createTimeoutMillis: 5000 - } - } +const configArgs: KnexConfigArgs = { + migrationDirs, + isTestEnv: isTestEnv(), + isDevOrTestEnv: isDevOrTestEnv(), + applicationName: 'speckle_server', + logger, + maxConnections: postgresMaxConnections() } const config: Record = { test: { ...createKnexConfig({ - connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_test' + connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_test', + ...configArgs }) }, development: { ...createKnexConfig({ - connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_dev' + connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_dev', + ...configArgs }) }, production: { ...createKnexConfig({ - connectionString: connectionUri + connectionString: connectionUri, + ...configArgs }) } } +export const configureClient = (config: RegionServerConfig) => { + return configureKnexClient(config, configArgs) +} + export default config diff --git a/packages/server/modules/activitystream/repositories/index.ts b/packages/server/modules/activitystream/repositories/index.ts index bf5bbb5f2..758c97312 100644 --- a/packages/server/modules/activitystream/repositories/index.ts +++ b/packages/server/modules/activitystream/repositories/index.ts @@ -27,6 +27,7 @@ import { Knex } from 'knex' import { getStreamFactory } from '@/modules/core/repositories/streams' import { getUserFactory } from '@/modules/core/repositories/users' import { getServerInfoFactory } from '@/modules/core/repositories/server' +import { getProjectDbClient } from '@/modules/multiregion/dbSelector' const tables = { streamActivity: (db: Knex) => @@ -253,11 +254,15 @@ export const saveActivityFactory = } } + const projectDb = await getProjectDbClient({ projectId: streamId }) + // yes, we're manually instantiating this thing here, but i do not want to go through all the places, + // where we're calling saveActivity! + // the whole activity module will need to be refactored to use the eventBus await dispatchStreamEventFactory({ - getStreamWebhooks: getStreamWebhooksFactory({ db }), + getStreamWebhooks: getStreamWebhooksFactory({ db: projectDb }), getServerInfo: getServerInfoFactory({ db }), - getStream: getStreamFactory({ db }), - createWebhookEvent: createWebhookEventFactory({ db }), + getStream: getStreamFactory({ db: projectDb }), + createWebhookEvent: createWebhookEventFactory({ db: projectDb }), getUser: getUserFactory({ db }) })({ streamId, diff --git a/packages/server/modules/multiregion/dbSelector.ts b/packages/server/modules/multiregion/dbSelector.ts index dcc645273..905dd8736 100644 --- a/packages/server/modules/multiregion/dbSelector.ts +++ b/packages/server/modules/multiregion/dbSelector.ts @@ -13,16 +13,15 @@ import { GetRegionDb } from '@/modules/multiregion/services/projectRegion' import { getGenericRedis } from '@/modules/shared/redis/redis' -import knex, { Knex } from 'knex' +import { Knex } from 'knex' import { getRegionFactory, getRegionsFactory } from '@/modules/multiregion/repositories' import { MisconfiguredEnvironmentError } from '@/modules/shared/errors' -import { createKnexConfig } from '@/knexfile' +import { configureClient } from '@/knexfile' import { InitializeRegion } from '@/modules/multiregion/domain/operations' import { getAvailableRegionConfig, getMainRegionConfig } from '@/modules/multiregion/regionConfig' -import { RegionServerConfig } from '@/modules/multiregion/domain/types' import { MaybeNullOrUndefined } from '@speckle/shared' let getter: GetProjectDb | undefined = undefined @@ -40,7 +39,7 @@ export const getRegionDb: GetRegionDb = async ({ regionKey }) => { throw new Error(`RegionKey ${regionKey} not available in config`) const newRegionConfig = regionConfigs[regionKey] - const regionDb = configureKnexClient(newRegionConfig).public + const regionDb = configureClient(newRegionConfig).public regionClients[regionKey] = regionDb } @@ -97,29 +96,11 @@ const initializeRegisteredRegionClients = async (): Promise => { throw new MisconfiguredEnvironmentError( `Missing region config for ${region.key} region` ) - return [region.key, configureKnexClient(regionConfigs[region.key]).public] + return [region.key, configureClient(regionConfigs[region.key]).public] }) ) } -const configureKnexClient = ( - config: RegionServerConfig -): { public: Knex; private?: Knex } => { - const knexConfig = createKnexConfig({ - connectionString: config.postgres.connectionUri, - caCertificate: config.postgres.publicTlsCertificate - }) - const privateConfig = config.postgres.privateConnectionUri - ? knex( - createKnexConfig({ - connectionString: config.postgres.privateConnectionUri, - caCertificate: config.postgres.publicTlsCertificate - }) - ) - : undefined - return { public: knex(knexConfig), private: privateConfig } -} - export const getRegisteredRegionClients = async (): Promise => { if (!registeredRegionClients) registeredRegionClients = await initializeRegisteredRegionClients() @@ -139,12 +120,12 @@ export const initializeRegion: InitializeRegion = async ({ regionKey }) => { throw new Error(`RegionKey ${regionKey} not available in config`) const newRegionConfig = regionConfigs[regionKey] - const regionDb = configureKnexClient(newRegionConfig) + const regionDb = configureClient(newRegionConfig) await regionDb.public.migrate.latest() // TODO, set up pub-sub shit const mainDbConfig = await getMainRegionConfig() - const mainDb = configureKnexClient(mainDbConfig) + const mainDb = configureClient(mainDbConfig) const sslmode = newRegionConfig.postgres.publicTlsCertificate ? 'require' : 'disable' diff --git a/packages/server/modules/multiregion/domain/operations.ts b/packages/server/modules/multiregion/domain/operations.ts index 336d40bed..55e7e4b70 100644 --- a/packages/server/modules/multiregion/domain/operations.ts +++ b/packages/server/modules/multiregion/domain/operations.ts @@ -1,6 +1,6 @@ import { - MultiRegionConfig, ProjectRegion, + DataRegionsConfig, RegionKey, ServerRegion } from '@/modules/multiregion/domain/types' @@ -18,7 +18,7 @@ export type UpdateRegion = (params: { region: Partial }) => Promise -export type GetAvailableRegionConfig = () => Promise +export type GetAvailableRegionConfig = () => Promise export type GetAvailableRegionKeys = () => Promise export type GetFreeRegionKeys = () => Promise diff --git a/packages/server/modules/multiregion/domain/types.ts b/packages/server/modules/multiregion/domain/types.ts index 9ec20a254..01a7149d2 100644 --- a/packages/server/modules/multiregion/domain/types.ts +++ b/packages/server/modules/multiregion/domain/types.ts @@ -1,16 +1,11 @@ -import { z } from 'zod' -import { - multiRegionConfigSchema, - regionServerConfigSchema -} from '@/modules/multiregion/helpers/validation' import { RegionRecord } from '@/modules/multiregion/helpers/types' import { Nullable } from '@speckle/shared' +import { + DataRegionsConfig, + RegionServerConfig +} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js' -export type AllRegionsConfig = z.infer -export type MainRegionConfig = AllRegionsConfig['main'] -export type MultiRegionConfig = AllRegionsConfig['regions'] -export type RegionServerConfig = z.infer - +export { RegionServerConfig, DataRegionsConfig } export type ServerRegion = RegionRecord export type RegionKey = Nullable diff --git a/packages/server/modules/multiregion/helpers/validation.ts b/packages/server/modules/multiregion/helpers/validation.ts deleted file mode 100644 index ffe85109c..000000000 --- a/packages/server/modules/multiregion/helpers/validation.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { z } from 'zod' - -export const regionServerConfigSchema = z.object({ - postgres: z.object({ - connectionUri: z - .string() - .describe( - 'Full Postgres connection URI (e.g. "postgres://user:password@host:port/dbname")' - ), - privateConnectionUri: z - .string() - .describe( - 'Full Postgres connection URI in VPN or Docker networks (e.g. "postgres://user:password@host:port/dbname")' - ) - .optional(), - publicTlsCertificate: z - .string() - .describe('Public TLS ("CA") certificate for the Postgres server') - .optional() - }) - //TODO - add the rest of the config when blob storage is implemented - // blobStorage: z - // .object({ - // endpoint: z.string().url(), - // accessKey: z.string(), - // secretKey: z.string(), - // bucket: z.string() - // }) -}) - -export const multiRegionConfigSchema = z.object({ - main: regionServerConfigSchema, - regions: z.record(z.string(), regionServerConfigSchema) -}) diff --git a/packages/server/modules/multiregion/regionConfig.ts b/packages/server/modules/multiregion/regionConfig.ts index 2b8e250d2..3b98a4300 100644 --- a/packages/server/modules/multiregion/regionConfig.ts +++ b/packages/server/modules/multiregion/regionConfig.ts @@ -1,68 +1,42 @@ import { GetAvailableRegionConfig } from '@/modules/multiregion/domain/operations' -import { AllRegionsConfig } from '@/modules/multiregion/domain/types' import { packageRoot } from '@/bootstrap' import path from 'node:path' -import fs from 'node:fs/promises' import { getMultiRegionConfigPath, isDevOrTestEnv } from '@/modules/shared/helpers/envHelper' import { type Optional } from '@speckle/shared' -import { multiRegionConfigSchema } from '@/modules/multiregion/helpers/validation' -import { MisconfiguredEnvironmentError } from '@/modules/shared/errors' -import { get } from 'lodash' import { isMultiRegionEnabled } from '@/modules/multiregion/helpers' +import { + MainRegionConfig, + MultiRegionConfig, + loadMultiRegionsConfig +} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js' -let multiRegionConfig: Optional = undefined +let multiRegionConfig: Optional = undefined -const getAllRegionsConfig = async (): Promise => { +const getMultiRegionConfig = async (): Promise => { if (isDevOrTestEnv() && !isMultiRegionEnabled()) // this should throw somehow return { main: { postgres: { connectionUri: '' } }, regions: {} } - if (multiRegionConfig) return multiRegionConfig + if (!multiRegionConfig) { + const relativePath = getMultiRegionConfigPath() - const relativePath = getMultiRegionConfigPath() + const configPath = path.resolve(packageRoot, relativePath) - const fullPath = path.resolve(packageRoot, relativePath) - - let file: string - try { - file = await fs.readFile(fullPath, 'utf-8') - } catch (e) { - if (get(e, 'code') === 'ENOENT') { - throw new MisconfiguredEnvironmentError( - `Multi-region config file not found at path: ${fullPath}` - ) - } - - throw e + multiRegionConfig = await loadMultiRegionsConfig({ + path: configPath + }) } - let parsedJson: string - try { - parsedJson = JSON.parse(file) // This will throw if the file is not valid JSON - } catch (e) { - throw new MisconfiguredEnvironmentError( - `Multi-region config file at path '${fullPath}' is not valid JSON` - ) - } - - const multiRegionConfigFileResult = multiRegionConfigSchema.safeParse(parsedJson) // This will throw if the config is invalid - if (!multiRegionConfigFileResult.success) - throw new MisconfiguredEnvironmentError( - `Multi-region config file at path '${fullPath}' does not fit the schema`, - { cause: multiRegionConfigFileResult.error, info: { parsedJson } } - ) - - multiRegionConfig = multiRegionConfigFileResult.data return multiRegionConfig } -export const getMainRegionConfig = async (): Promise => { - return (await getAllRegionsConfig()).main +export const getMainRegionConfig = async (): Promise => { + return (await getMultiRegionConfig()).main } export const getAvailableRegionConfig: GetAvailableRegionConfig = async () => { - return (await getAllRegionsConfig()).regions + return (await getMultiRegionConfig()).regions } diff --git a/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts b/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts index df6e9f1cb..4064b9412 100644 --- a/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts +++ b/packages/server/modules/multiregion/tests/e2e/serverAdmin.graph.spec.ts @@ -1,4 +1,4 @@ -import { MultiRegionConfig } from '@/modules/multiregion/domain/types' +import { DataRegionsConfig } from '@/modules/multiregion/domain/types' import { Regions } from '@/modules/multiregion/repositories' import { BasicTestUser, createTestUser } from '@/test/authHelper' import { @@ -27,7 +27,7 @@ describe.skip('Multi Region Server Settings', () => { const fakeRegionKey1 = 'us-west-1' const fakeRegionKey2 = 'eu-east-2' - const fakeRegionConfig: MultiRegionConfig = { + const fakeRegionConfig: DataRegionsConfig = { [fakeRegionKey1]: { postgres: { connectionUri: 'postgres://user:password@uswest1:port/dbname' diff --git a/packages/shared/package.json b/packages/shared/package.json index 3191ee7a0..ff47d9be9 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -38,6 +38,7 @@ }, "peerDependencies": { "@tiptap/core": "^2.0.0-beta.176", + "knex": "*", "mixpanel": "^0.17.0", "pino": "^8.7.0", "pino-http": "^8.0.0", @@ -55,6 +56,7 @@ "@typescript-eslint/parser": "^7.12.0", "eslint": "^9.4.0", "eslint-config-prettier": "^9.1.0", + "knex": "^2.4.1", "mixpanel": "^0.17.0", "pino": "^8.7.0", "pino-http": "^8.0.0", diff --git a/packages/shared/src/environment/multiRegionConfig.ts b/packages/shared/src/environment/multiRegionConfig.ts new file mode 100644 index 000000000..d0874be6a --- /dev/null +++ b/packages/shared/src/environment/multiRegionConfig.ts @@ -0,0 +1,154 @@ +import { z } from 'zod' +import fs from 'node:fs/promises' +import { Knex, knex } from 'knex' +import { Logger } from 'pino' + +export const regionConfigSchema = z.object({ + postgres: z.object({ + connectionUri: z + .string() + .describe( + 'Full Postgres connection URI (e.g. "postgres://user:password@host:port/dbname")' + ), + privateConnectionUri: z + .string() + .describe( + 'Full Postgres connection URI in VPN or Docker networks (e.g. "postgres://user:password@host:port/dbname")' + ) + .optional(), + publicTlsCertificate: z + .string() + .describe('Public TLS ("CA") certificate for the Postgres server') + .optional() + }) + //TODO - add the rest of the config when blob storage is implemented + // blobStorage: z + // .object({ + // endpoint: z.string().url(), + // accessKey: z.string(), + // secretKey: z.string(), + // bucket: z.string() + // }) +}) + +export const multiRegionConfigSchema = z.object({ + main: regionConfigSchema, + regions: z.record(z.string(), regionConfigSchema) +}) + +export type MultiRegionConfig = z.infer +export type MainRegionConfig = MultiRegionConfig['main'] +export type DataRegionsConfig = MultiRegionConfig['regions'] +export type RegionServerConfig = z.infer + +export const loadMultiRegionsConfig = async ({ + path +}: { + path: string +}): Promise => { + let file: string + try { + file = await fs.readFile(path, 'utf-8') + } catch (e) { + if (e instanceof Error && 'code' in e && e.code === 'ENOENT') { + throw new Error(`Multi-region config file not found at path: ${path}`) + } + throw e + } + + let parsedJson: Record + try { + parsedJson = JSON.parse(file) as Record // This will throw if the file is not valid JSON + } catch (e) { + throw new Error(`Multi-region config file at path '${path}' is not valid JSON`) + } + + const multiRegionConfigFileResult = multiRegionConfigSchema.safeParse(parsedJson) // This will throw if the config is invalid + if (!multiRegionConfigFileResult.success) + throw new Error( + `Multi-region config file at path '${path}' does not fit the schema: ${multiRegionConfigFileResult.error}` + ) + + return multiRegionConfigFileResult.data +} + +export type KnexConfigArgs = { + migrationDirs: string[] + isTestEnv: boolean + isDevOrTestEnv: boolean + logger: Logger + maxConnections: number + applicationName: string +} + +export const createKnexConfig = ({ + connectionString, + migrationDirs, + isTestEnv, + isDevOrTestEnv, + logger, + maxConnections, + caCertificate +}: { + connectionString?: string | undefined + caCertificate?: string | undefined +} & KnexConfigArgs): Knex.Config => { + return { + client: 'pg', + migrations: { + extension: 'ts', + loadExtensions: isTestEnv ? ['.js', '.ts'] : ['.js'], + directory: migrationDirs + }, + log: { + warn(message: unknown) { + logger.warn(message) + }, + error(message: unknown) { + logger.error(message) + }, + deprecate(message: unknown) { + logger.info(message) + }, + debug(message: unknown) { + logger.debug(message) + } + }, + connection: { + connectionString, + ssl: caCertificate ? { ca: caCertificate, rejectUnauthorized: true } : undefined, + // eslint-disable-next-line camelcase + application_name: 'speckle_server' + }, + // we wish to avoid leaking sql queries in the logs: https://knexjs.org/guide/#compilesqlonerror + compileSqlOnError: isDevOrTestEnv, + asyncStackTraces: isDevOrTestEnv, + pool: { + min: 0, + max: maxConnections, + acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts + createTimeoutMillis: 5000 + } + } +} + +export const configureKnexClient = ( + config: RegionServerConfig, + configArgs: KnexConfigArgs +): { public: Knex; private?: Knex } => { + const knexConfig = createKnexConfig({ + connectionString: config.postgres.connectionUri, + caCertificate: config.postgres.publicTlsCertificate, + ...configArgs + }) + const privateConfig = config.postgres.privateConnectionUri + ? knex( + createKnexConfig({ + connectionString: config.postgres.privateConnectionUri, + caCertificate: config.postgres.publicTlsCertificate, + ...configArgs + }) + ) + : undefined + return { public: knex(knexConfig), private: privateConfig } +} diff --git a/packages/webhook-service/.vscode/launch.json b/packages/webhook-service/.vscode/launch.json new file mode 100644 index 000000000..9f22179fd --- /dev/null +++ b/packages/webhook-service/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Webhook service", + "type": "node", + "request": "launch", + "runtimeExecutable": "yarn", + "runtimeArgs": ["dev"], + "skipFiles": ["/**"], + "env": { + "FF_WORKSPACES_MULTI_REGION_ENABLED": "false" + } + } + ] +} diff --git a/packages/webhook-service/src/knex.js b/packages/webhook-service/src/knex.js index 5cc4adad8..9daade342 100644 --- a/packages/webhook-service/src/knex.js +++ b/packages/webhook-service/src/knex.js @@ -1,18 +1,51 @@ -/* eslint-disable camelcase */ 'use strict' +const Environment = require('@speckle/shared/dist/commonjs/environment/index.js') +const { + loadMultiRegionsConfig, + configureKnexClient +} = require('@speckle/shared/dist/commonjs/environment/multiRegionConfig.js') +const { logger } = require('./observability/logging') -module.exports = require('knex')({ - client: 'pg', - connection: { - application_name: 'speckle_webhook_service', - connectionString: - process.env.PG_CONNECTION_STRING || 'postgres://speckle:speckle@127.0.0.1/speckle' - }, - pool: { - min: 0, - max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS_WEBHOOK_SERVICE) || 1, - acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts - createTimeoutMillis: 5000 +const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags() + +const isDevEnv = process.env.NODE_ENV !== 'production' + +let dbClients +const getDbClients = async () => { + if (dbClients) return dbClients + const maxConnections = + parseInt(process.env.POSTGRES_MAX_CONNECTIONS_WEBHOOK_SERVICE) || 1 + + const configArgs = { + migrationDirs: [], + isTestEnv: isDevEnv, + isDevOrTestEnv: isDevEnv, + logger, + maxConnections, + applicationName: 'speckle_webhook_service' } - // migrations are in managed in the server package -}) + if (!FF_WORKSPACES_MULTI_REGION_ENABLED) { + const mainClient = configureKnexClient( + { + postgres: { + connectionUri: + process.env.PG_CONNECTION_STRING || + 'postgres://speckle:speckle@127.0.0.1/speckle' + } + }, + configArgs + ) + dbClients = { main: mainClient } + } else { + const configPath = process.env.MULTI_REGION_CONFIG_PATH || 'multiregion.json' + const config = await loadMultiRegionsConfig({ path: configPath }) + const clients = [['main', configureKnexClient(config.main, configArgs)]] + Object.entries(config.regions).map(([key, config]) => { + clients.push([key, configureKnexClient(config, configArgs)]) + }) + dbClients = Object.fromEntries(clients) + } + return dbClients +} + +module.exports = getDbClients diff --git a/packages/webhook-service/src/main.js b/packages/webhook-service/src/main.js index ca2e1b9b2..69597f115 100644 --- a/packages/webhook-service/src/main.js +++ b/packages/webhook-service/src/main.js @@ -1,7 +1,7 @@ 'use strict' const crypto = require('crypto') -const knex = require('./knex') +const getDbClients = require('./knex') const fs = require('fs') const metrics = require('./observability/prometheusMetrics') const { logger } = require('./observability/logging') @@ -12,10 +12,8 @@ const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query' const { makeNetworkRequest } = require('./webhookCaller') const WebhookError = require('./errors') -const startTaskFactory = - ({ db }) => - async () => { - const { rows } = await db.raw(` +const startTask = async (db) => { + const { rows } = await db.raw(` UPDATE webhooks_events SET "status" = 1, @@ -29,16 +27,14 @@ const startTaskFactory = WHERE webhooks_events."id" = task."id" RETURNING webhooks_events."id" `) - return rows[0] - } + return rows[0] +} -const doTaskFactory = - ({ db }) => - async (task) => { - let boundLogger = logger.child({ taskId: task.id }) - try { - const { rows } = await db.raw( - ` +const doTask = async (db, task) => { + let boundLogger = logger.child({ taskId: task.id }) + try { + const { rows } = await db.raw( + ` SELECT ev.payload as evt, cnf.id as wh_id, cnf.url as wh_url, cnf.secret as wh_secret, cnf.enabled as wh_enabled @@ -47,49 +43,49 @@ const doTaskFactory = WHERE ev.id = ? LIMIT 1 `, - [task.id] + [task.id] + ) + const info = rows[0] + if (!info) { + throw new Error('Internal error: DB inconsistent') + } + boundLogger = boundLogger.child({ webhookId: info.wh_id }) + + const fullPayload = JSON.parse(info.evt) + boundLogger = boundLogger.child({ + streamId: fullPayload.streamId, + eventName: fullPayload.event.event_name + }) + + const postData = { payload: fullPayload } + + const signature = crypto + .createHmac('sha256', info.wh_secret || '') + .update(JSON.stringify(postData)) + .digest('hex') + const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature } + + boundLogger.info('Calling webhook.') + const result = await makeNetworkRequest({ + url: info.wh_url, + data: postData, + headersData: postHeaders, + logger: boundLogger + }) + + boundLogger.info({ result }, `Received response from webhook.`) + + if (!result.success) { + throw new WebhookError( + result.error, + 'Calling webhook was unsuccessful.', + result.responseCode, + result.responseBody ) - const info = rows[0] - if (!info) { - throw new Error('Internal error: DB inconsistent') - } - boundLogger = boundLogger.child({ webhookId: info.wh_id }) + } - const fullPayload = JSON.parse(info.evt) - boundLogger = boundLogger.child({ - streamId: fullPayload.streamId, - eventName: fullPayload.event.event_name - }) - - const postData = { payload: fullPayload } - - const signature = crypto - .createHmac('sha256', info.wh_secret || '') - .update(JSON.stringify(postData)) - .digest('hex') - const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature } - - boundLogger.info('Calling webhook.') - const result = await makeNetworkRequest({ - url: info.wh_url, - data: postData, - headersData: postHeaders, - logger: boundLogger - }) - - boundLogger.info({ result }, `Received response from webhook.`) - - if (!result.success) { - throw new WebhookError( - result.error, - 'Calling webhook was unsuccessful.', - result.responseCode, - result.responseBody - ) - } - - await db.raw( - ` + await db.raw( + ` UPDATE webhooks_events SET "status" = 2, @@ -97,18 +93,18 @@ const doTaskFactory = "statusInfo" = 'Webhook called' WHERE "id" = ? `, - [task.id] - ) - } catch (err) { - switch (err.constructor) { - case WebhookError: - boundLogger.warn({ err }, 'Failed to trigger webhook event.') - break - default: - boundLogger.error(err, 'Failed to trigger webhook event.') - } - await db.raw( - ` + [task.id] + ) + } catch (err) { + switch (err.constructor) { + case WebhookError: + boundLogger.warn({ err }, 'Failed to trigger webhook event.') + break + default: + boundLogger.error(err, 'Failed to trigger webhook event.') + } + await db.raw( + ` UPDATE webhooks_events SET "status" = 3, @@ -116,43 +112,46 @@ const doTaskFactory = "statusInfo" = ? WHERE "id" = ? `, - [err.toString(), task.id] + [err.toString(), task.id] + ) + metrics.metricOperationErrors.labels('webhook').inc() + } +} + +const doStuff = async (dbClients) => { + while (!shouldExit) { + const tasks = ( + await Promise.all( + dbClients.map(async (db) => { + fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {}) + const task = await startTask(db) + if (!task) return + return [db, task] + }) ) - metrics.metricOperationErrors.labels('webhook').inc() - } - } - -const tickFactory = - ({ doTask, startTask, tick }) => - async () => { - if (shouldExit) { - process.exit(0) - } - - try { - const task = await startTask() - - fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {}) - - if (!task) { - setTimeout(tick, 1000) - return - } - - const metricDurationEnd = metrics.metricDuration.startTimer() - - await doTask(task) - - metricDurationEnd({ op: 'webhook' }) - - // Check for another task very soon - setTimeout(tick, 10) - } catch (err) { - metrics.metricOperationErrors.labels('main_loop').inc() - logger.error(err, 'Error executing task') - setTimeout(tick, 5000) + ).filter((t) => t) + if (!tasks.length) { + await new Promise((r) => setTimeout(r, 1000)) + continue } + + await Promise.all( + tasks.map(async ([db, task]) => { + try { + const metricDurationEnd = metrics.metricDuration.startTimer() + + await doTask(db, task) + + metricDurationEnd({ op: 'webhook' }) + } catch (err) { + metrics.metricOperationErrors.labels('main_loop').inc() + logger.error(err, 'Error executing task') + } + }) + ) } + process.exit(0) +} async function main() { logger.info('Starting Webhook Service...') @@ -161,14 +160,11 @@ async function main() { shouldExit = true logger.info('Shutting down...') }) - metrics.initPrometheusMetrics() + await metrics.initPrometheusMetrics() - const tick = tickFactory({ - doTask: doTaskFactory({ db: knex }), - startTask: startTaskFactory({ db: knex }), - tick: (...args) => tick(...args) - }) - tick() + const dbClients = Object.values(await getDbClients()).map((client) => client.public) + + await doStuff(dbClients) } main() diff --git a/packages/webhook-service/src/observability/prometheusMetrics.js b/packages/webhook-service/src/observability/prometheusMetrics.js index a43602de6..51094dbcc 100644 --- a/packages/webhook-service/src/observability/prometheusMetrics.js +++ b/packages/webhook-service/src/observability/prometheusMetrics.js @@ -3,7 +3,7 @@ const http = require('http') const prometheusClient = require('prom-client') -const knex = require('../knex') +const getDbClients = require('../knex') let metricFree = null let metricUsed = null @@ -24,7 +24,8 @@ prometheusClient.collectDefaultMetrics() let prometheusInitialized = false -function initKnexPrometheusMetrics() { +async function initKnexPrometheusMetrics() { + const knex = (await getDbClients()).main.public metricFree = new prometheusClient.Gauge({ name: 'speckle_server_knex_free', help: 'Number of free DB connections', @@ -114,11 +115,11 @@ function initKnexPrometheusMetrics() { } module.exports = { - initPrometheusMetrics() { + async initPrometheusMetrics() { if (prometheusInitialized) return prometheusInitialized = true - initKnexPrometheusMetrics() + await initKnexPrometheusMetrics() // Define the HTTP server const server = http.createServer(async (req, res) => { diff --git a/yarn.lock b/yarn.lock index cd0ecf834..308e16502 100644 --- a/yarn.lock +++ b/yarn.lock @@ -17134,6 +17134,7 @@ __metadata: "@typescript-eslint/parser": "npm:^7.12.0" eslint: "npm:^9.4.0" eslint-config-prettier: "npm:^9.1.0" + knex: "npm:^2.4.1" lodash: "npm:^4.17.21" lodash-es: "npm:^4.17.21" mixpanel: "npm:^0.17.0" @@ -17147,6 +17148,7 @@ __metadata: zod: "npm:^3.22.4" peerDependencies: "@tiptap/core": ^2.0.0-beta.176 + knex: "*" mixpanel: ^0.17.0 pino: ^8.7.0 pino-http: ^8.0.0 From 9e383f83b174bef1a105536d705850160f534bb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= <57442769+gjedlicska@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:10:59 +0100 Subject: [PATCH 10/10] fix(workers): add zod and znv (#3484) --- packages/fileimport-service/package.json | 4 +++- packages/preview-service/package.json | 3 ++- packages/webhook-service/package.json | 4 +++- yarn.lock | 9 +++++++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/packages/fileimport-service/package.json b/packages/fileimport-service/package.json index 86cdbd6cc..201e087e3 100644 --- a/packages/fileimport-service/package.json +++ b/packages/fileimport-service/package.json @@ -34,7 +34,9 @@ "prom-client": "^14.0.1", "undici": "^5.28.4", "valid-filename": "^3.1.0", - "web-ifc": "^0.0.36" + "web-ifc": "^0.0.36", + "znv": "^0.4.0", + "zod": "^3.22.4" }, "devDependencies": { "cross-env": "^7.0.3", diff --git a/packages/preview-service/package.json b/packages/preview-service/package.json index 62fbec1f5..cd475b923 100644 --- a/packages/preview-service/package.json +++ b/packages/preview-service/package.json @@ -59,7 +59,8 @@ "tarn": "^3.0.2", "yargs": "^17.3.0", "zlib": "^1.0.5", - "zod": "^3.23.8" + "znv": "^0.4.0", + "zod": "^3.22.4" }, "devDependencies": { "@aws-sdk/client-s3": "^3.645.0", diff --git a/packages/webhook-service/package.json b/packages/webhook-service/package.json index f948eaa8b..2b96938dd 100644 --- a/packages/webhook-service/package.json +++ b/packages/webhook-service/package.json @@ -30,7 +30,9 @@ "pino-pretty": "^9.1.1", "private-ip": "^2.3.3", "prom-client": "^14.0.1", - "verror": "^1.10.1" + "verror": "^1.10.1", + "znv": "^0.4.0", + "zod": "^3.22.4" }, "devDependencies": { "cross-env": "^7.0.3", diff --git a/yarn.lock b/yarn.lock index 308e16502..b450ee72a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -16593,6 +16593,8 @@ __metadata: undici: "npm:^5.28.4" valid-filename: "npm:^3.1.0" web-ifc: "npm:^0.0.36" + znv: "npm:^0.4.0" + zod: "npm:^3.22.4" languageName: unknown linkType: soft @@ -16930,7 +16932,8 @@ __metadata: webpack-dev-server: "npm:^4.6.0" yargs: "npm:^17.3.0" zlib: "npm:^1.0.5" - zod: "npm:^3.23.8" + znv: "npm:^0.4.0" + zod: "npm:^3.22.4" languageName: unknown linkType: soft @@ -17359,6 +17362,8 @@ __metadata: private-ip: "npm:^2.3.3" prom-client: "npm:^14.0.1" verror: "npm:^1.10.1" + znv: "npm:^0.4.0" + zod: "npm:^3.22.4" languageName: unknown linkType: soft @@ -54647,7 +54652,7 @@ __metadata: languageName: node linkType: hard -"zod@npm:3.23.8, zod@npm:^3.23.8": +"zod@npm:3.23.8": version: 3.23.8 resolution: "zod@npm:3.23.8" checksum: 10/846fd73e1af0def79c19d510ea9e4a795544a67d5b34b7e1c4d0425bf6bfd1c719446d94cdfa1721c1987d891321d61f779e8236fde517dc0e524aa851a6eff1