chore(server): refactor activityStream invocations - batch #4 - commits

This commit is contained in:
Kristaps Fabians Geikins
2025-01-23 14:10:21 +02:00
parent 5d1a46d541
commit ac88c503e3
22 changed files with 484 additions and 481 deletions
@@ -1,11 +1,8 @@
import {
AddCommitDeletedActivity,
AddCommitMovedActivity
} from '@/modules/activitystream/domain/operations'
import {
GetStreamBranchByName,
StoreBranch
} from '@/modules/core/domain/branches/operations'
import { VersionEvents } from '@/modules/core/domain/commits/events'
import {
DeleteCommits,
GetCommits,
@@ -22,10 +19,17 @@ import {
CommitsDeleteInput,
CommitsMoveInput,
DeleteVersionsInput,
MoveVersionsInput
MoveVersionsInput,
ProjectVersionsUpdatedMessageType
} from '@/modules/core/graph/generated/graphql'
import { Roles } from '@/modules/core/helpers/mainConstants'
import { CommitPubsubEvents } from '@/modules/shared'
import { ensureError } from '@/modules/shared/helpers/errorHelper'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import {
ProjectSubscriptions,
PublishSubscription
} from '@/modules/shared/utils/subscriptions'
import { difference, groupBy, has, keyBy } from 'lodash'
type OldBatchInput = CommitsMoveInput | CommitsDeleteInput
@@ -153,7 +157,8 @@ export const batchMoveCommitsFactory =
deps: ValidateCommitsMoveDeps & {
createBranch: StoreBranch
moveCommitsToBranch: MoveCommitsToBranch
addCommitMovedActivity: AddCommitMovedActivity
emitEvent: EventBusEmit
publishSub: PublishSubscription
}
): ValidateAndBatchMoveCommits =>
async (params: CommitsMoveInput | MoveVersionsInput, userId: string) => {
@@ -177,16 +182,31 @@ export const batchMoveCommitsFactory =
await deps.moveCommitsToBranch(commitIds, finalBranch.id)
await Promise.all(
commitsWithStreams.map(({ commit, stream }) =>
deps.addCommitMovedActivity({
commitId: commit.id,
streamId: stream.id,
userId,
commit,
originalBranchId: commit.branchId,
newBranchId: finalBranch.id
})
)
commitsWithStreams.map(async ({ commit, stream }) => {
await Promise.all([
deps.emitEvent({
eventName: VersionEvents.MovedModel,
payload: {
versionId: commit.id,
projectId: stream.id,
userId,
originalModelId: commit.branchId,
newModelId: finalBranch.id,
version: commit
}
}),
// TODO: Move to event bus listeners
deps.publishSub(ProjectSubscriptions.ProjectVersionsUpdated, {
projectId: stream.id,
projectVersionsUpdated: {
id: commit.id,
version: { ...commit, streamId: stream.id },
type: ProjectVersionsUpdatedMessageType.Updated,
modelId: finalBranch.id
}
})
])
})
)
return finalBranch
} catch (e) {
@@ -202,7 +222,8 @@ export const batchDeleteCommitsFactory =
(
deps: ValidateBatchBaseRulesDeps & {
deleteCommits: DeleteCommits
addCommitDeletedActivity: AddCommitDeletedActivity
emitEvent: EventBusEmit
publishSub: PublishSubscription
}
): ValidateAndBatchDeleteCommits =>
async (params: CommitsDeleteInput | DeleteVersionsInput, userId: string) => {
@@ -216,15 +237,38 @@ export const batchDeleteCommitsFactory =
try {
await deps.deleteCommits(commitIds)
await Promise.all(
commitsWithStreams.map(({ commit, stream }) =>
deps.addCommitDeletedActivity({
commitId: commit.id,
streamId: stream.id,
userId,
commit,
branchId: commit.branchId
})
)
commitsWithStreams.map(async ({ commit, stream }) => {
await Promise.all([
deps.emitEvent({
eventName: VersionEvents.Deleted,
payload: {
projectId: stream.id,
modelId: commit.branchId,
versionId: commit.id,
userId,
version: commit
}
}),
// TODO: Move to event bus listeners
deps.publishSub(CommitPubsubEvents.CommitDeleted, {
commitDeleted: {
...commit,
streamId: stream.id,
branchId: commit.branchId
},
streamId: stream.id
}),
deps.publishSub(ProjectSubscriptions.ProjectVersionsUpdated, {
projectId: stream.id,
projectVersionsUpdated: {
id: commit.id,
type: ProjectVersionsUpdatedMessageType.Deleted,
version: null,
modelId: commit.branchId
}
})
])
})
)
} catch (e) {
const err = ensureError(e)
@@ -1,10 +1,3 @@
import {
AddCommitCreatedActivity,
AddCommitDeletedActivity,
AddCommitUpdatedActivity,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import {
GetBranchById,
GetStreamBranchByName,
@@ -41,15 +34,21 @@ import {
CommitReceivedInput,
CommitUpdateInput,
MarkReceivedVersionInput,
ProjectVersionsUpdatedMessageType,
UpdateVersionInput
} from '@/modules/core/graph/generated/graphql'
import { BranchRecord, CommitRecord } from '@/modules/core/helpers/types'
import { CommitPubsubEvents } from '@/modules/shared'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import {
ProjectSubscriptions,
PublishSubscription
} from '@/modules/shared/utils/subscriptions'
import { ensureError, Roles } from '@speckle/shared'
import { has } from 'lodash'
export const markCommitReceivedAndNotifyFactory =
({ getCommit, saveActivity }: { getCommit: GetCommit; saveActivity: SaveActivity }) =>
({ getCommit, emitEvent }: { getCommit: GetCommit; emitEvent: EventBusEmit }) =>
async (params: {
input: MarkReceivedVersionInput | CommitReceivedInput
userId: string
@@ -75,17 +74,15 @@ export const markCommitReceivedAndNotifyFactory =
)
}
await saveActivity({
streamId: oldInput.streamId,
resourceType: ResourceTypes.Commit,
resourceId: oldInput.commitId,
actionType: ActionTypes.Commit.Receive,
userId,
info: {
sourceApplication: input.sourceApplication,
message: input.message
},
message: `Commit ${oldInput.commitId} was received by user ${userId}`
await emitEvent({
eventName: VersionEvents.Received,
payload: {
projectId: oldInput.streamId,
versionId: oldInput.commitId,
userId,
sourceApplication: oldInput.sourceApplication,
message: oldInput.message
}
})
}
@@ -98,10 +95,10 @@ export const createCommitByBranchIdFactory =
insertBranchCommits: InsertBranchCommits
markCommitStreamUpdated: MarkCommitStreamUpdated
markCommitBranchUpdated: MarkCommitBranchUpdated
addCommitCreatedActivity: AddCommitCreatedActivity
emitEvent: EventBusEmit
publishSub: PublishSubscription
}): CreateCommitByBranchId =>
async (params, options) => {
async (params) => {
const {
streamId,
branchId,
@@ -111,7 +108,6 @@ export const createCommitByBranchIdFactory =
sourceApplication,
parents
} = params
const { notify = true } = options || {}
// If no total children count is passed in, get it from the original object
// that this commit references.
@@ -148,6 +144,10 @@ export const createCommitByBranchIdFactory =
deps.insertStreamCommits([{ streamId, commitId: id }])
])
const input = {
...params,
branchName: branch.name
}
await Promise.all([
deps.markCommitStreamUpdated(id),
deps.markCommitBranchUpdated(id),
@@ -156,27 +156,26 @@ export const createCommitByBranchIdFactory =
payload: {
projectId: streamId,
modelId: branchId,
version: commit
version: commit,
input,
modelName: branch.name,
userId: authorId
}
}),
...(notify
? [
deps.addCommitCreatedActivity({
commitId: commit.id,
streamId,
userId: authorId,
branchName: branch.name,
input: {
...commit,
branchName: branch.name,
objectId,
streamId
},
modelId: branch.id,
commit
})
]
: [])
// TODO: Move to event bus listeners
deps.publishSub(CommitPubsubEvents.CommitCreated, {
commitCreated: { ...input, id, authorId },
streamId
}),
deps.publishSub(ProjectSubscriptions.ProjectVersionsUpdated, {
projectId: streamId,
projectVersionsUpdated: {
id: commit.id,
version: { ...commit, streamId },
type: ProjectVersionsUpdatedMessageType.Created,
modelId: branchId
}
})
])
return { ...commit, streamId, branchId }
@@ -188,7 +187,7 @@ export const createCommitByBranchNameFactory =
getStreamBranchByName: GetStreamBranchByName
getBranchById: GetBranchById
}): CreateCommitByBranchName =>
async (params, options) => {
async (params) => {
const {
streamId,
objectId,
@@ -198,9 +197,6 @@ export const createCommitByBranchNameFactory =
parents,
totalChildrenCount
} = params
const { notify = true } = options || {}
const branchName = params.branchName.toLowerCase()
let myBranch = await deps.getStreamBranchByName(streamId, branchName)
if (!myBranch) {
@@ -215,19 +211,16 @@ export const createCommitByBranchNameFactory =
)
}
const commit = await deps.createCommitByBranchId(
{
streamId,
branchId: myBranch.id,
objectId,
authorId,
message,
sourceApplication,
totalChildrenCount,
parents
},
{ notify }
)
const commit = await deps.createCommitByBranchId({
streamId,
branchId: myBranch.id,
objectId,
authorId,
message,
sourceApplication,
totalChildrenCount,
parents
})
return commit
}
@@ -245,9 +238,10 @@ export const updateCommitAndNotifyFactory =
getCommitBranch: GetCommitBranch
switchCommitBranch: SwitchCommitBranch
updateCommit: UpdateCommit
addCommitUpdatedActivity: AddCommitUpdatedActivity
markCommitStreamUpdated: MarkCommitStreamUpdated
markCommitBranchUpdated: MarkCommitBranchUpdated
emitEvent: EventBusEmit
publishSub: PublishSubscription
}): UpdateCommitAndNotify =>
async (params: CommitUpdateInput | UpdateVersionInput, userId: string) => {
const {
@@ -323,19 +317,43 @@ export const updateCommitAndNotifyFactory =
}
if (commit) {
await deps.addCommitUpdatedActivity({
commitId,
streamId: stream.id,
userId,
originalCommit: commit,
update: params,
newCommit,
branchId: branch!.id
})
const legacyUpdateStruct: CommitUpdateInput = isOldVersionUpdateInput(params)
? params
: {
id: params.versionId,
message: params.message,
streamId: stream.id
}
const [updatedBranch] = await Promise.all([
deps.markCommitBranchUpdated(commit.id),
deps.markCommitStreamUpdated(commit.id)
deps.markCommitStreamUpdated(commit.id),
deps.emitEvent({
eventName: VersionEvents.Updated,
payload: {
projectId: stream.id,
modelId: branch!.id,
versionId: commitId,
newVersion: newCommit,
oldVersion: commit,
userId,
update: params
}
}),
// TODO: Move to event bus listeners
deps.publishSub(CommitPubsubEvents.CommitUpdated, {
commitUpdated: { ...legacyUpdateStruct },
streamId: stream.id,
commitId
}),
deps.publishSub(ProjectSubscriptions.ProjectVersionsUpdated, {
projectId: stream.id,
projectVersionsUpdated: {
id: commitId,
version: { ...newCommit, streamId: stream.id },
type: ProjectVersionsUpdatedMessageType.Updated,
modelId: branch!.id
}
})
])
branch = updatedBranch
}
@@ -349,7 +367,8 @@ export const deleteCommitAndNotifyFactory =
markCommitStreamUpdated: MarkCommitStreamUpdated
markCommitBranchUpdated: MarkCommitBranchUpdated
deleteCommit: DeleteCommit
addCommitDeletedActivity: AddCommitDeletedActivity
emitEvent: EventBusEmit
publishSub: PublishSubscription
}): DeleteCommitAndNotify =>
async (commitId: string, streamId: string, userId: string) => {
const commit = await deps.getCommit(commitId)
@@ -372,13 +391,32 @@ export const deleteCommitAndNotifyFactory =
const isDeleted = await deps.deleteCommit(commitId)
if (isDeleted) {
await deps.addCommitDeletedActivity({
commitId,
streamId,
userId,
commit,
branchId: updatedBranch.id
})
await Promise.all([
deps.emitEvent({
eventName: VersionEvents.Deleted,
payload: {
projectId: streamId,
modelId: updatedBranch.id,
versionId: commitId,
userId,
version: commit
}
}),
// TODO: Move to event bus listeners
deps.publishSub(CommitPubsubEvents.CommitDeleted, {
commitDeleted: { ...commit, streamId, branchId: updatedBranch.id },
streamId
}),
deps.publishSub(ProjectSubscriptions.ProjectVersionsUpdated, {
projectId: streamId,
projectVersionsUpdated: {
id: commitId,
type: ProjectVersionsUpdatedMessageType.Deleted,
version: null,
modelId: updatedBranch.id
}
})
])
}
return isDeleted