diff --git a/packages/server/modules/activitystream/domain/operations.ts b/packages/server/modules/activitystream/domain/operations.ts index 8bdcce5ec..480d650a4 100644 --- a/packages/server/modules/activitystream/domain/operations.ts +++ b/packages/server/modules/activitystream/domain/operations.ts @@ -253,26 +253,20 @@ export type AddCommitDeletedActivity = (params: { branchId: string }) => Promise -export type AddCommentCreatedActivity = (params: { - streamId: string - userId: string +export type AddThreadCreatedActivity = (params: { input: CommentCreatedActivityInput comment: CommentRecord }) => Promise export type AddCommentArchivedActivity = (params: { - streamId: string - commentId: string userId: string input: MutationCommentArchiveArgs comment: CommentRecord }) => Promise export type AddReplyAddedActivity = (params: { - streamId: string input: ReplyCreatedActivityInput reply: CommentRecord - userId: string }) => Promise export type AddBranchCreatedActivity = (params: { diff --git a/packages/server/modules/activitystream/events/commentListeners.ts b/packages/server/modules/activitystream/events/commentListeners.ts new file mode 100644 index 000000000..dc03d12f3 --- /dev/null +++ b/packages/server/modules/activitystream/events/commentListeners.ts @@ -0,0 +1,120 @@ +import { + AddThreadCreatedActivity, + AddReplyAddedActivity, + SaveActivity, + AddCommentArchivedActivity +} from '@/modules/activitystream/domain/operations' +import { + CommentCreatedActivityInput, + ReplyCreatedActivityInput +} from '@/modules/activitystream/domain/types' +import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' +import { CommentEvents, CommentEventsPayloads } from '@/modules/comments/domain/events' +import { ReplyCreateInput } from '@/modules/core/graph/generated/graphql' +import { EventBusListen } from '@/modules/shared/services/eventBus' +import { has } from 'lodash' +import { OverrideProperties } from 'type-fest' + +const addThreadCreatedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddThreadCreatedActivity => + async (params) => { + const { input, comment } = params + + await saveActivity({ + resourceId: comment.id, + streamId: comment.streamId, + resourceType: ResourceTypes.Comment, + actionType: ActionTypes.Comment.Create, + userId: comment.authorId, + info: { input }, + message: `Comment added: ${comment.id} (${input})` + }) + } + +const isLegacyReplyCreateInput = ( + i: ReplyCreatedActivityInput +): i is ReplyCreateInput => has(i, 'streamId') + +const addReplyAddedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddReplyAddedActivity => + async (params) => { + const { input, reply } = params + + const parentCommentId = isLegacyReplyCreateInput(input) + ? input.parentComment + : input.threadId + await saveActivity({ + streamId: reply.streamId, + resourceType: ResourceTypes.Comment, + resourceId: parentCommentId, + actionType: ActionTypes.Comment.Reply, + userId: reply.authorId, + info: { input }, + message: `Comment reply #${reply.id} created` + }) + } + +const addCommentArchivedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddCommentArchivedActivity => + async (params) => { + const { userId, input, comment } = params + + await saveActivity({ + streamId: comment.streamId, + resourceType: ResourceTypes.Comment, + resourceId: comment.id, + actionType: ActionTypes.Comment.Archive, + userId, + info: { input }, + message: `Comment #${comment.id} archived` + }) + } + +const isReplyCreatedPayload = ( + payload: CommentEventsPayloads[typeof CommentEvents.Created] +): payload is OverrideProperties< + CommentEventsPayloads[typeof CommentEvents.Created], + { + input: ReplyCreatedActivityInput + } +> => { + return payload.isThread === false +} + +const isThreadCreatedPayload = ( + payload: CommentEventsPayloads[typeof CommentEvents.Created] +): payload is OverrideProperties< + CommentEventsPayloads[typeof CommentEvents.Created], + { + input: CommentCreatedActivityInput + } +> => { + return payload.isThread +} + +export const reportCommentActivityFactory = + (deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => { + const addThreadCreatedActivity = addThreadCreatedActivityFactory(deps) + const addReplyAddedActivity = addReplyAddedActivityFactory(deps) + const addCommentArchivedActivity = addCommentArchivedActivityFactory(deps) + + const quitters = [ + deps.eventListen(CommentEvents.Created, async ({ payload }) => { + if (isReplyCreatedPayload(payload)) { + await addReplyAddedActivity({ + reply: payload.comment, + input: payload.input + }) + } else if (isThreadCreatedPayload(payload)) { + await addThreadCreatedActivity(payload) + } + }), + deps.eventListen(CommentEvents.Archived, async ({ payload }) => { + await addCommentArchivedActivity(payload) + }) + ] + + return () => { + quitters.forEach((quit) => quit()) + } + } diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index 62596a91b..883e9b3a4 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -30,6 +30,7 @@ import { reportUserActivityFactory } from '@/modules/activitystream/events/userL import { reportAccessRequestActivityFactory } from '@/modules/activitystream/events/accessRequestListeners' import { reportBranchActivityFactory } from '@/modules/activitystream/events/branchListeners' import { reportCommitActivityFactory } from '@/modules/activitystream/events/commitListeners' +import { reportCommentActivityFactory } from '@/modules/activitystream/events/commentListeners' let scheduledTask: ReturnType | null = null let quitEventListeners: Optional<() => void> = undefined @@ -62,12 +63,17 @@ const initializeEventListeners = ({ eventListen: eventBus.listen, saveActivity }) + const reportCommentActivity = reportCommentActivityFactory({ + eventListen: eventBus.listen, + saveActivity + }) const quitCbs = [ reportUserActivity(), reportAccessRequestActivity(), reportBranchActivity(), reportCommitActivity(), + reportCommentActivity(), eventBus.listen(ServerInvitesEvents.Created, async ({ payload }) => { if (!isProjectResourceTarget(payload.invite.resource)) return await onServerInviteCreatedFactory({ diff --git a/packages/server/modules/activitystream/services/commentActivity.ts b/packages/server/modules/activitystream/services/commentActivity.ts deleted file mode 100644 index 7b9807318..000000000 --- a/packages/server/modules/activitystream/services/commentActivity.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { - AddCommentArchivedActivity, - AddCommentCreatedActivity, - AddReplyAddedActivity, - SaveActivity -} from '@/modules/activitystream/domain/operations' -import { - CommentCreatedActivityInput, - ReplyCreatedActivityInput -} from '@/modules/activitystream/domain/types' -import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' -import { - GetViewerResourceItemsUngrouped, - GetViewerResourcesForComment, - GetViewerResourcesFromLegacyIdentifiers -} from '@/modules/comments/domain/operations' -import { ViewerResourceItem } from '@/modules/comments/domain/types' -import { - CommentCreateInput, - ProjectCommentsUpdatedMessageType, - ReplyCreateInput -} from '@/modules/core/graph/generated/graphql' -import { PublishSubscription, pubsub } from '@/modules/shared/utils/subscriptions' -import { - CommentSubscriptions, - ProjectSubscriptions -} from '@/modules/shared/utils/subscriptions' -import { has } from 'lodash' - -const isLegacyCommentCreateInput = ( - i: CommentCreatedActivityInput -): i is CommentCreateInput => has(i, 'streamId') - -export const addCommentCreatedActivityFactory = - ({ - getViewerResourceItemsUngrouped, - getViewerResourcesFromLegacyIdentifiers, - saveActivity, - publish - }: { - getViewerResourceItemsUngrouped: GetViewerResourceItemsUngrouped - getViewerResourcesFromLegacyIdentifiers: GetViewerResourcesFromLegacyIdentifiers - saveActivity: SaveActivity - publish: PublishSubscription - }): AddCommentCreatedActivity => - async (params) => { - const { streamId, userId, input, comment } = params - - let resourceIds: string - let resourceItems: ViewerResourceItem[] - if (isLegacyCommentCreateInput(input)) { - resourceIds = input.resources.map((res) => res?.resourceId).join(',') - - const validResources = input.resources.filter( - (r): r is NonNullable => !!r - ) - resourceItems = await getViewerResourcesFromLegacyIdentifiers( - streamId, - validResources - ) - } else { - resourceItems = - input.resolvedResourceItems || - (await getViewerResourceItemsUngrouped({ - projectId: streamId, - resourceIdString: input.resourceIdString - })) - resourceIds = resourceItems.map((i) => i.versionId || i.objectId).join(',') - } - - await Promise.all([ - saveActivity({ - resourceId: comment.id, - streamId, - resourceType: ResourceTypes.Comment, - actionType: ActionTypes.Comment.Create, - userId, - info: { input }, - message: `Comment added: ${comment.id} (${input})` - }), - // @deprecated unused in FE2 - pubsub.publish(CommentSubscriptions.CommentActivity, { - commentActivity: { - type: 'comment-added', - comment - }, - streamId, - resourceIds - }), - publish(ProjectSubscriptions.ProjectCommentsUpdated, { - projectCommentsUpdated: { - id: comment.id, - type: ProjectCommentsUpdatedMessageType.Created, - comment - }, - projectId: streamId, - resourceItems - }) - ]) - } - -/** - * Add comment archived/unarchived activity - */ -export const addCommentArchivedActivityFactory = - ({ - getViewerResourcesForComment, - saveActivity, - publish - }: { - getViewerResourcesForComment: GetViewerResourcesForComment - publish: PublishSubscription - saveActivity: SaveActivity - }): AddCommentArchivedActivity => - async (params) => { - const { streamId, commentId, userId, input, comment } = params - const isArchiving = !!input.archived - - await Promise.all([ - saveActivity({ - streamId, - resourceType: ResourceTypes.Comment, - resourceId: commentId, - actionType: ActionTypes.Comment.Archive, - userId, - info: { input }, - message: `Comment #${commentId} archived` - }), - // @deprecated not used in FE2 - pubsub.publish(CommentSubscriptions.CommentThreadActivity, { - commentThreadActivity: { - type: isArchiving ? 'comment-archived' : 'comment-added' - }, - streamId: input.streamId, - commentId: input.commentId - }), - publish(ProjectSubscriptions.ProjectCommentsUpdated, { - projectCommentsUpdated: { - id: commentId, - type: isArchiving - ? ProjectCommentsUpdatedMessageType.Archived - : ProjectCommentsUpdatedMessageType.Created, - comment: isArchiving ? null : comment - }, - projectId: streamId, - resourceItems: await getViewerResourcesForComment(streamId, comment.id) - }) - ]) - } - -const isLegacyReplyCreateInput = ( - i: ReplyCreatedActivityInput -): i is ReplyCreateInput => has(i, 'streamId') - -export const addReplyAddedActivityFactory = - ({ - getViewerResourcesForComment, - saveActivity, - publish - }: { - getViewerResourcesForComment: GetViewerResourcesForComment - publish: PublishSubscription - saveActivity: SaveActivity - }): AddReplyAddedActivity => - async (params) => { - const { streamId, input, reply, userId } = params - - const parentCommentId = isLegacyReplyCreateInput(input) - ? input.parentComment - : input.threadId - await Promise.all([ - saveActivity({ - streamId, - resourceType: ResourceTypes.Comment, - resourceId: parentCommentId, - actionType: ActionTypes.Comment.Reply, - userId, - info: { input }, - message: `Comment reply #${reply.id} created` - }), - // @deprecated - pubsub.publish(CommentSubscriptions.CommentThreadActivity, { - commentThreadActivity: { - type: 'reply-added', - reply - }, - streamId, - commentId: parentCommentId - }), - publish(ProjectSubscriptions.ProjectCommentsUpdated, { - projectCommentsUpdated: { - id: reply.id, - type: ProjectCommentsUpdatedMessageType.Created, - comment: reply - }, - projectId: streamId, - resourceItems: await getViewerResourcesForComment(streamId, reply.id) - }) - ]) - } diff --git a/packages/server/modules/cli/commands/download/commit.ts b/packages/server/modules/cli/commands/download/commit.ts index d86504eb8..e04394ae5 100644 --- a/packages/server/modules/cli/commands/download/commit.ts +++ b/packages/server/modules/cli/commands/download/commit.ts @@ -45,19 +45,13 @@ import { markCommentUpdatedFactory, markCommentViewedFactory } from '@/modules/comments/repositories/comments' -import { - addCommentCreatedActivityFactory, - addReplyAddedActivityFactory -} from '@/modules/activitystream/services/commentActivity' import { validateInputAttachmentsFactory } from '@/modules/comments/services/commentTextService' import { getBlobsFactory } from '@/modules/blobstorage/repositories' import { createCommitByBranchIdFactory } from '@/modules/core/services/commit/management' -import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { publish } from '@/modules/shared/utils/subscriptions' import { getUserFactory } from '@/modules/core/repositories/users' import { createObjectFactory } from '@/modules/core/services/objects/management' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' -import { db, mainDb } from '@/db/knex' +import { db } from '@/db/knex' import { getEventBus } from '@/modules/shared/services/eventBus' const command: CommandModule< @@ -141,13 +135,7 @@ const command: CommandModule< insertComments, insertCommentLinks, markCommentViewed, - emitEvent: getEventBus().emit, - addCommentCreatedActivity: addCommentCreatedActivityFactory({ - getViewerResourcesFromLegacyIdentifiers, - getViewerResourceItemsUngrouped, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - }) + emitEvent: getEventBus().emit }) const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({ @@ -157,13 +145,9 @@ const command: CommandModule< insertCommentLinks, markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }), emitEvent: getEventBus().emit, - addReplyAddedActivity: addReplyAddedActivityFactory({ - getViewerResourcesForComment: getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), - getViewerResourcesFromLegacyIdentifiers - }), - saveActivity: saveActivityFactory({ db: mainDb }), - publish + getViewerResourcesForComment: getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), + getViewerResourcesFromLegacyIdentifiers }) }) diff --git a/packages/server/modules/cli/commands/download/project.ts b/packages/server/modules/cli/commands/download/project.ts index b1e73f478..8faf71097 100644 --- a/packages/server/modules/cli/commands/download/project.ts +++ b/packages/server/modules/cli/commands/download/project.ts @@ -26,10 +26,6 @@ import { createCommentThreadAndNotifyFactory } from '@/modules/comments/services/management' import { createBranchAndNotifyFactory } from '@/modules/core/services/branch/management' -import { - addCommentCreatedActivityFactory, - addReplyAddedActivityFactory -} from '@/modules/activitystream/services/commentActivity' import { createCommitFactory, getAllBranchCommitsFactory, @@ -45,7 +41,7 @@ import { getViewerResourcesForCommentsFactory, getViewerResourcesFromLegacyIdentifiersFactory } from '@/modules/core/services/commit/viewerResources' -import { db, mainDb } from '@/db/knex' +import { db } from '@/db/knex' import { getCommentFactory, getCommentsResourcesFactory, @@ -56,8 +52,6 @@ import { } from '@/modules/comments/repositories/comments' import { getBlobsFactory } from '@/modules/blobstorage/repositories' import { validateInputAttachmentsFactory } from '@/modules/comments/services/commentTextService' -import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { publish } from '@/modules/shared/utils/subscriptions' import { getUserFactory } from '@/modules/core/repositories/users' import { createObjectFactory } from '@/modules/core/services/objects/management' import { authorizeResolver } from '@/modules/shared' @@ -166,13 +160,7 @@ const command: CommandModule< insertComments, insertCommentLinks, markCommentViewed, - emitEvent: getEventBus().emit, - addCommentCreatedActivity: addCommentCreatedActivityFactory({ - getViewerResourcesFromLegacyIdentifiers, - getViewerResourceItemsUngrouped, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - }) + emitEvent: getEventBus().emit }) const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({ getComment: getCommentFactory({ db: projectDb }), @@ -181,13 +169,9 @@ const command: CommandModule< insertCommentLinks, markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }), emitEvent: getEventBus().emit, - addReplyAddedActivity: addReplyAddedActivityFactory({ - getViewerResourcesForComment: getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), - getViewerResourcesFromLegacyIdentifiers - }), - saveActivity: saveActivityFactory({ db: mainDb }), - publish + getViewerResourcesForComment: getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), + getViewerResourcesFromLegacyIdentifiers }) }) diff --git a/packages/server/modules/comments/domain/events.ts b/packages/server/modules/comments/domain/events.ts index 30f73395e..6e19679d8 100644 --- a/packages/server/modules/comments/domain/events.ts +++ b/packages/server/modules/comments/domain/events.ts @@ -1,16 +1,33 @@ +import { + CommentCreatedActivityInput, + ReplyCreatedActivityInput +} from '@/modules/activitystream/domain/types' +import { ViewerResourceItem } from '@/modules/comments/domain/types' import { CommentRecord } from '@/modules/comments/helpers/types' +import { MutationCommentArchiveArgs } from '@/modules/core/graph/generated/graphql' export const commentEventsNamespace = 'comments' as const export const CommentEvents = { Created: `${commentEventsNamespace}.created`, - Updated: `${commentEventsNamespace}.updated` + Updated: `${commentEventsNamespace}.updated`, + Archived: `${commentEventsNamespace}.archived` } as const export type CommentEventsPayloads = { - [CommentEvents.Created]: { comment: CommentRecord } + [CommentEvents.Created]: { + comment: CommentRecord + isThread: boolean + input: CommentCreatedActivityInput | ReplyCreatedActivityInput + resourceItems: ViewerResourceItem[] + } [CommentEvents.Updated]: { previousComment: CommentRecord newComment: CommentRecord } + [CommentEvents.Archived]: { + userId: string + input: MutationCommentArchiveArgs + comment: CommentRecord + } } diff --git a/packages/server/modules/comments/events/subscriptionListeners.ts b/packages/server/modules/comments/events/subscriptionListeners.ts new file mode 100644 index 000000000..f6368aa20 --- /dev/null +++ b/packages/server/modules/comments/events/subscriptionListeners.ts @@ -0,0 +1,90 @@ +import { CommentEvents } from '@/modules/comments/domain/events' +import { GetViewerResourcesForComment } from '@/modules/comments/domain/operations' +import { ProjectCommentsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' +import { DependenciesOf } from '@/modules/shared/helpers/factory' +import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus' +import { + CommentSubscriptions, + ProjectSubscriptions, + PublishSubscription +} from '@/modules/shared/utils/subscriptions' + +const reportCommentCreatedFactory = + (deps: { publish: PublishSubscription }) => + async (payload: EventPayload) => { + const { comment, resourceItems } = payload.payload + + await Promise.all([ + // @deprecated unused in FE2 + deps.publish(CommentSubscriptions.CommentActivity, { + commentActivity: { + type: 'comment-added', + comment + }, + streamId: comment.streamId, + resourceIds: resourceItems.map((i) => i.versionId || i.objectId).join(',') + }), + deps.publish(ProjectSubscriptions.ProjectCommentsUpdated, { + projectCommentsUpdated: { + id: comment.id, + type: ProjectCommentsUpdatedMessageType.Created, + comment + }, + projectId: comment.streamId, + resourceItems + }) + ]) + } + +const reportCommentArchivedFactory = + (deps: { + publish: PublishSubscription + getViewerResourcesForComment: GetViewerResourcesForComment + }) => + async (payload: EventPayload) => { + const { + comment, + input: { archived, streamId } + } = payload.payload + + await Promise.all([ + deps.publish(CommentSubscriptions.CommentThreadActivity, { + commentThreadActivity: { + type: archived ? 'comment-archived' : 'comment-added' + }, + streamId, + commentId: comment.id + }), + deps.publish(ProjectSubscriptions.ProjectCommentsUpdated, { + projectCommentsUpdated: { + id: comment.id, + type: archived + ? ProjectCommentsUpdatedMessageType.Archived + : ProjectCommentsUpdatedMessageType.Created, + comment: archived ? null : comment + }, + projectId: streamId, + resourceItems: await deps.getViewerResourcesForComment(streamId, comment.id) + }) + ]) + } + +export const reportSubscriptionEventsFactory = + ( + deps: { + eventListen: EventBusListen + publish: PublishSubscription + } & DependenciesOf & + DependenciesOf + ) => + () => { + const reportCommentCreated = reportCommentCreatedFactory(deps) + const reportCommentArchived = reportCommentArchivedFactory(deps) + + const quitters = [ + deps.eventListen(CommentEvents.Created, reportCommentCreated), + deps.eventListen(CommentEvents.Archived, reportCommentArchived) + ] + + return () => quitters.forEach((q) => q()) + } diff --git a/packages/server/modules/comments/graph/resolvers/comments.ts b/packages/server/modules/comments/graph/resolvers/comments.ts index f6f97da06..fa59df8a2 100644 --- a/packages/server/modules/comments/graph/resolvers/comments.ts +++ b/packages/server/modules/comments/graph/resolvers/comments.ts @@ -49,11 +49,6 @@ import { filteredSubscribe, ProjectSubscriptions } from '@/modules/shared/utils/subscriptions' -import { - addCommentArchivedActivityFactory, - addCommentCreatedActivityFactory, - addReplyAddedActivityFactory -} from '@/modules/activitystream/services/commentActivity' import { doViewerResourcesFit, getViewerResourcesForCommentFactory, @@ -95,7 +90,6 @@ import { } from '@/modules/core/repositories/branches' import { getStreamObjectsFactory } from '@/modules/core/repositories/objects' import { getStreamFactory } from '@/modules/core/repositories/streams' -import { saveActivityFactory } from '@/modules/activitystream/repositories' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { Knex } from 'knex' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -533,8 +527,6 @@ export = { const getViewerResourceItemsUngrouped = buildGetViewerResourceItemsUngrouped({ db: projectDb }) - const getViewerResourcesFromLegacyIdentifiers = - buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) const validateInputAttachments = validateInputAttachmentsFactory({ getBlobs: getBlobsFactory({ db: projectDb }) @@ -549,13 +541,7 @@ export = { insertComments, insertCommentLinks, markCommentViewed, - emitEvent: getEventBus().emit, - addCommentCreatedActivity: addCommentCreatedActivityFactory({ - getViewerResourcesFromLegacyIdentifiers, - getViewerResourceItemsUngrouped, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - }) + emitEvent: getEventBus().emit }) return await createCommentThreadAndNotify(args.input, ctx.userId!) @@ -588,13 +574,9 @@ export = { insertCommentLinks, markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }), emitEvent: getEventBus().emit, - addReplyAddedActivity: addReplyAddedActivityFactory({ - getViewerResourcesForComment: getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), - getViewerResourcesFromLegacyIdentifiers - }), - saveActivity: saveActivityFactory({ db: mainDb }), - publish + getViewerResourcesForComment: getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), + getViewerResourcesFromLegacyIdentifiers }) }) @@ -656,11 +638,8 @@ export = { getComment, getStream, updateComment, - addCommentArchivedActivity: addCommentArchivedActivityFactory({ - getViewerResourcesForComment, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - }) + getViewerResourcesForComment, + emitEvent: getEventBus().emit }) await archiveCommentAndNotify( @@ -740,6 +719,8 @@ export = { throw new ForbiddenError('You are not authorized.') const projectDb = await getProjectDbClient({ projectId: args.input.streamId }) + const getViewerResourcesFromLegacyIdentifiers = + buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) const createComment = createCommentFactory({ checkStreamResourcesAccess: streamResourceCheckFactory({ @@ -752,31 +733,14 @@ export = { insertCommentLinks: insertCommentLinksFactory({ db: projectDb }), deleteComment: deleteCommentFactory({ db: projectDb }), markCommentViewed: markCommentViewedFactory({ db: projectDb }), - emitEvent: getEventBus().emit + emitEvent: getEventBus().emit, + getViewerResourcesFromLegacyIdentifiers }) const comment = await createComment({ userId: context.userId, input: args.input }) - const getViewerResourceItemsUngrouped = buildGetViewerResourceItemsUngrouped({ - db: projectDb - }) - const getViewerResourcesFromLegacyIdentifiers = - buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) - - await addCommentCreatedActivityFactory({ - getViewerResourceItemsUngrouped, - getViewerResourcesFromLegacyIdentifiers, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - })({ - streamId: args.input.streamId, - userId: context.userId, - input: args.input, - comment - }) - return comment.id }, @@ -828,27 +792,10 @@ export = { const archiveComment = archiveCommentFactory({ getComment: getCommentFactory({ db: projectDb }), getStream, - updateComment: updateCommentFactory({ db: projectDb }) - }) - const updatedComment = await archiveComment({ ...args, userId: context.userId! }) // NOTE: permissions check inside service - - const getViewerResourcesFromLegacyIdentifiers = - buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) - const getViewerResourcesForComment = getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), - getViewerResourcesFromLegacyIdentifiers - }) - await addCommentArchivedActivityFactory({ - getViewerResourcesForComment, - saveActivity: saveActivityFactory({ db: mainDb }), - publish - })({ - streamId: args.streamId, - commentId: args.commentId, - userId: context.userId!, - input: args, - comment: updatedComment + updateComment: updateCommentFactory({ db: projectDb }), + emitEvent: getEventBus().emit }) + await archiveComment({ ...args, userId: context.userId! }) // NOTE: permissions check inside service return true }, @@ -878,7 +825,12 @@ export = { }), deleteComment: deleteCommentFactory({ db: projectDb }), markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }), - emitEvent: getEventBus().emit + emitEvent: getEventBus().emit, + getViewerResourcesForComment: getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), + getViewerResourcesFromLegacyIdentifiers: + buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) + }) }) const reply = await createCommentReply({ authorId: context.userId, @@ -889,22 +841,6 @@ export = { blobIds: args.input.blobIds }) - const getViewerResourcesFromLegacyIdentifiers = - buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb }) - await addReplyAddedActivityFactory({ - getViewerResourcesForComment: getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db: projectDb }), - getViewerResourcesFromLegacyIdentifiers - }), - saveActivity: saveActivityFactory({ db: mainDb }), - publish - })({ - streamId: args.input.streamId, - input: args.input, - reply, - userId: context.userId - }) - return reply.id } }, diff --git a/packages/server/modules/comments/index.ts b/packages/server/modules/comments/index.ts index 0f495fa4c..39cba0970 100644 --- a/packages/server/modules/comments/index.ts +++ b/packages/server/modules/comments/index.ts @@ -2,10 +2,20 @@ import { db } from '@/db/knex' import { moduleLogger } from '@/logging/logging' import { saveActivityFactory } from '@/modules/activitystream/repositories' import { addStreamCommentMentionActivityFactory } from '@/modules/activitystream/services/streamActivity' +import { reportSubscriptionEventsFactory } from '@/modules/comments/events/subscriptionListeners' +import { getCommentsResourcesFactory } from '@/modules/comments/repositories/comments' import { notifyUsersOnCommentEventsFactory } from '@/modules/comments/services/notifications' +import { getCommitsAndTheirBranchIdsFactory } from '@/modules/core/repositories/commits' +import { getStreamObjectsFactory } from '@/modules/core/repositories/objects' +import { + getViewerResourcesForCommentFactory, + getViewerResourcesForCommentsFactory, + getViewerResourcesFromLegacyIdentifiersFactory +} from '@/modules/core/services/commit/viewerResources' import { publishNotification } from '@/modules/notifications/services/publication' import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { getEventBus } from '@/modules/shared/services/eventBus' +import { publish } from '@/modules/shared/utils/subscriptions' let unsubFromEvents: Optional<() => void> = undefined @@ -22,6 +32,28 @@ const commentsModule: SpeckleModule = { }) }) unsubFromEvents = await notifyUsersOnCommentEvents() + + const getViewerResourcesFromLegacyIdentifiers = + getViewerResourcesFromLegacyIdentifiersFactory({ + getViewerResourcesForComments: getViewerResourcesForCommentsFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers: (...args) => + getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep + }), + getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }), + getStreamObjects: getStreamObjectsFactory({ db }) + }) + const getViewerResourcesForComment = getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers: (...args) => + getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep + }) + + reportSubscriptionEventsFactory({ + eventListen: getEventBus().listen, + publish, + getViewerResourcesForComment + })() } }, async finalize() {}, diff --git a/packages/server/modules/comments/services/index.ts b/packages/server/modules/comments/services/index.ts index 6fff95514..e99e37345 100644 --- a/packages/server/modules/comments/services/index.ts +++ b/packages/server/modules/comments/services/index.ts @@ -14,6 +14,8 @@ import { CheckStreamResourcesAccess, DeleteComment, GetComment, + GetViewerResourcesForComment, + GetViewerResourcesFromLegacyIdentifiers, InsertCommentLinks, InsertComments, MarkCommentUpdated, @@ -58,6 +60,7 @@ export const createCommentFactory = deleteComment: DeleteComment markCommentViewed: MarkCommentViewed emitEvent: EventBusEmit + getViewerResourcesFromLegacyIdentifiers: GetViewerResourcesFromLegacyIdentifiers }) => async ({ userId, input }: { userId: string; input: CommentCreateInput }) => { if (input.resources.length < 1) @@ -123,10 +126,17 @@ export const createCommentFactory = await deps.markCommentViewed(id, userId) // so we don't self mark a comment as unread the moment it's created + const resourceItems = await deps.getViewerResourcesFromLegacyIdentifiers( + input.streamId, + input.resources.filter(isNonNullable) + ) await deps.emitEvent({ eventName: CommentEvents.Created, payload: { - comment: newComment + comment: newComment, + input, + isThread: true, + resourceItems } }) @@ -145,6 +155,7 @@ export const createCommentReplyFactory = deleteComment: DeleteComment markCommentUpdated: MarkCommentUpdated emitEvent: EventBusEmit + getViewerResourcesForComment: GetViewerResourcesForComment }) => async ({ authorId, @@ -195,10 +206,24 @@ export const createCommentReplyFactory = await deps.markCommentUpdated(parentCommentId) + const resourceItems = await deps.getViewerResourcesForComment( + newComment.streamId, + newComment.id + ) await deps.emitEvent({ eventName: CommentEvents.Created, payload: { - comment: newComment + comment: newComment, + isThread: false, + input: { + threadId: parentCommentId, + projectId: streamId, + content: { + blobIds, + doc: text + } + }, + resourceItems } }) @@ -256,6 +281,7 @@ export const archiveCommentFactory = getComment: GetComment getStream: GetStream updateComment: UpdateComment + emitEvent: EventBusEmit }) => async ({ commentId, @@ -282,5 +308,15 @@ export const archiveCommentFactory = } const updatedComment = await deps.updateComment(commentId, { archived }) + + await deps.emitEvent({ + eventName: CommentEvents.Archived, + payload: { + userId, + input: { archived, commentId, streamId }, + comment: updatedComment! + } + }) + return updatedComment! } diff --git a/packages/server/modules/comments/services/management.ts b/packages/server/modules/comments/services/management.ts index 0836579b4..a28c19f31 100644 --- a/packages/server/modules/comments/services/management.ts +++ b/packages/server/modules/comments/services/management.ts @@ -26,6 +26,7 @@ import { EditCommentAndNotify, GetComment, GetViewerResourceItemsUngrouped, + GetViewerResourcesForComment, InsertCommentLinks, InsertCommentPayload, InsertComments, @@ -35,11 +36,6 @@ import { ValidateInputAttachments } from '@/modules/comments/domain/operations' import { GetStream } from '@/modules/core/domain/streams/operations' -import { - AddCommentArchivedActivity, - AddCommentCreatedActivity, - AddReplyAddedActivity -} from '@/modules/activitystream/domain/operations' import { EventBusEmit } from '@/modules/shared/services/eventBus' import { CommentEvents } from '@/modules/comments/domain/events' @@ -118,7 +114,6 @@ export const createCommentThreadAndNotifyFactory = insertCommentLinks: InsertCommentLinks markCommentViewed: MarkCommentViewed emitEvent: EventBusEmit - addCommentCreatedActivity: AddCommentCreatedActivity }): CreateCommentThreadAndNotify => async (input: CreateCommentInput, userId: string) => { const [resources] = await Promise.all([ @@ -180,17 +175,11 @@ export const createCommentThreadAndNotifyFactory = deps.emitEvent({ eventName: CommentEvents.Created, payload: { - comment + comment, + input, + isThread: true, + resourceItems: resources } - }), - deps.addCommentCreatedActivity({ - streamId: input.projectId, - userId, - input: { - ...input, - resolvedResourceItems: resources - }, - comment }) ]) @@ -205,7 +194,7 @@ export const createCommentReplyAndNotifyFactory = insertCommentLinks: InsertCommentLinks markCommentUpdated: MarkCommentUpdated emitEvent: EventBusEmit - addReplyAddedActivity: AddReplyAddedActivity + getViewerResourcesForComment: GetViewerResourcesForComment }): CreateCommentReplyAndNotify => async (input: CreateCommentReplyInput, userId: string) => { const thread = await deps.getComment({ id: input.threadId, userId }) @@ -239,19 +228,20 @@ export const createCommentReplyAndNotifyFactory = } // Mark parent comment updated and emit events + const resourceItems = await deps.getViewerResourcesForComment( + reply.streamId, + reply.id + ) await Promise.all([ deps.markCommentUpdated(thread.id), deps.emitEvent({ eventName: CommentEvents.Created, payload: { - comment: reply + comment: reply, + input, + isThread: false, + resourceItems } - }), - deps.addReplyAddedActivity({ - streamId: thread.streamId, - input, - reply, - userId }) ]) @@ -298,7 +288,8 @@ export const archiveCommentAndNotifyFactory = getComment: GetComment getStream: GetStream updateComment: UpdateComment - addCommentArchivedActivity: AddCommentArchivedActivity + emitEvent: EventBusEmit + getViewerResourcesForComment: GetViewerResourcesForComment }): ArchiveCommentAndNotify => async (commentId: string, userId: string, archived = true) => { const comment = await deps.getComment({ id: commentId, userId }) @@ -321,16 +312,13 @@ export const archiveCommentAndNotifyFactory = archived }) - await deps.addCommentArchivedActivity({ - streamId: stream.id, - commentId, - userId, - input: { - archived, - streamId: stream.id, - commentId - }, - comment: updatedComment! + await deps.emitEvent({ + eventName: CommentEvents.Archived, + payload: { + userId, + input: { archived, commentId, streamId: stream.id }, + comment: updatedComment! + } }) return updatedComment diff --git a/packages/server/modules/comments/tests/comments.graph.spec.js b/packages/server/modules/comments/tests/comments.graph.spec.js index 5c52ea333..1fcf5b104 100644 --- a/packages/server/modules/comments/tests/comments.graph.spec.js +++ b/packages/server/modules/comments/tests/comments.graph.spec.js @@ -22,7 +22,8 @@ const { markCommentViewedFactory, insertCommentsFactory, insertCommentLinksFactory, - deleteCommentFactory + deleteCommentFactory, + getCommentsResourcesFactory } = require('@/modules/comments/repositories/comments') const { db } = require('@/db/knex') const { @@ -36,7 +37,8 @@ const { const { createCommitFactory, insertStreamCommitsFactory, - insertBranchCommitsFactory + insertBranchCommitsFactory, + getCommitsAndTheirBranchIdsFactory } = require('@/modules/core/repositories/commits') const { getBranchByIdFactory, @@ -53,7 +55,8 @@ const { } = require('@/modules/core/repositories/streams') const { getObjectFactory, - storeSingleObjectIfNotFoundFactory + storeSingleObjectIfNotFoundFactory, + getStreamObjectsFactory } = require('@/modules/core/repositories/objects') const { legacyCreateStreamFactory, @@ -108,6 +111,10 @@ const { } = require('@/modules/serverinvites/services/processing') const { getServerInfoFactory } = require('@/modules/core/repositories/server') const { createObjectFactory } = require('@/modules/core/services/objects/management') +const { + getViewerResourcesFromLegacyIdentifiersFactory, + getViewerResourcesForCommentsFactory +} = require('@/modules/core/services/commit/viewerResources') const getServerInfo = getServerInfoFactory({ db }) const getUser = getUserFactory({ db }) @@ -117,6 +124,18 @@ const streamResourceCheck = streamResourceCheckFactory({ checkStreamResourceAccess: checkStreamResourceAccessFactory({ db }) }) const markCommentViewed = markCommentViewedFactory({ db }) + +const getViewerResourcesFromLegacyIdentifiers = + getViewerResourcesFromLegacyIdentifiersFactory({ + getViewerResourcesForComments: getViewerResourcesForCommentsFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers: (...args) => + getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep + }), + getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }), + getStreamObjects: getStreamObjectsFactory({ db }) + }) + const createComment = createCommentFactory({ checkStreamResourcesAccess: streamResourceCheck, validateInputAttachments: validateInputAttachmentsFactory({ @@ -126,7 +145,8 @@ const createComment = createCommentFactory({ insertCommentLinks: insertCommentLinksFactory({ db }), deleteComment: deleteCommentFactory({ db }), markCommentViewed, - emitEvent: getEventBus().emit + emitEvent: getEventBus().emit, + getViewerResourcesFromLegacyIdentifiers }) const getObject = getObjectFactory({ db }) diff --git a/packages/server/modules/comments/tests/comments.spec.ts b/packages/server/modules/comments/tests/comments.spec.ts index 69ce60000..199aa4763 100644 --- a/packages/server/modules/comments/tests/comments.spec.ts +++ b/packages/server/modules/comments/tests/comments.spec.ts @@ -43,7 +43,8 @@ import { updateCommentFactory, getCommentsLegacyFactory, getResourceCommentCountFactory, - getStreamCommentCountFactory + getStreamCommentCountFactory, + getCommentsResourcesFactory } from '@/modules/comments/repositories/comments' import { db } from '@/db/knex' import { getBlobsFactory } from '@/modules/blobstorage/repositories' @@ -59,7 +60,8 @@ import { import { createCommitFactory, insertStreamCommitsFactory, - insertBranchCommitsFactory + insertBranchCommitsFactory, + getCommitsAndTheirBranchIdsFactory } from '@/modules/core/repositories/commits' import { getBranchByIdFactory, @@ -69,7 +71,8 @@ import { } from '@/modules/core/repositories/branches' import { getObjectFactory, - storeSingleObjectIfNotFoundFactory + storeSingleObjectIfNotFoundFactory, + getStreamObjectsFactory } from '@/modules/core/repositories/objects' import { legacyCreateStreamFactory, @@ -117,6 +120,11 @@ import { import { CommentRecord } from '@/modules/comments/helpers/types' import { MaybeNullOrUndefined } from '@speckle/shared' import { CommentEvents } from '@/modules/comments/domain/events' +import { + getViewerResourcesForCommentFactory, + getViewerResourcesForCommentsFactory, + getViewerResourcesFromLegacyIdentifiersFactory +} from '@/modules/core/services/commit/viewerResources' type LegacyCommentRecord = CommentRecord & { total_count: string @@ -137,6 +145,17 @@ const validateInputAttachments = validateInputAttachmentsFactory({ const insertComments = insertCommentsFactory({ db }) const insertCommentLinks = insertCommentLinksFactory({ db }) const deleteComment = deleteCommentFactory({ db }) + +const getViewerResourcesFromLegacyIdentifiers = + getViewerResourcesFromLegacyIdentifiersFactory({ + getViewerResourcesForComments: getViewerResourcesForCommentsFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers: (...args) => + getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep + }), + getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }), + getStreamObjects: getStreamObjectsFactory({ db }) + }) const createComment = createCommentFactory({ checkStreamResourcesAccess: streamResourceCheck, validateInputAttachments, @@ -144,7 +163,13 @@ const createComment = createCommentFactory({ insertCommentLinks, deleteComment, markCommentViewed, - emitEvent: getEventBus().emit + emitEvent: getEventBus().emit, + getViewerResourcesFromLegacyIdentifiers +}) +const getViewerResourcesForComment = getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers: (...args) => + getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep }) const createCommentReply = createCommentReplyFactory({ validateInputAttachments, @@ -153,7 +178,8 @@ const createCommentReply = createCommentReplyFactory({ checkStreamResourcesAccess: streamResourceCheck, deleteComment, markCommentUpdated: markCommentUpdatedFactory({ db }), - emitEvent: getEventBus().emit + emitEvent: getEventBus().emit, + getViewerResourcesForComment }) const getComment = getCommentFactory({ db }) const updateComment = updateCommentFactory({ db }) @@ -166,7 +192,8 @@ const editComment = editCommentFactory({ const archiveComment = archiveCommentFactory({ getComment, getStream, - updateComment + updateComment, + emitEvent: getEventBus().emit }) const getComments = getCommentsLegacyFactory({ db }) const getResourceCommentCount = getResourceCommentCountFactory({ db }) diff --git a/packages/server/modules/cross-server-sync/index.ts b/packages/server/modules/cross-server-sync/index.ts index 08faa16db..7e964e8f8 100644 --- a/packages/server/modules/cross-server-sync/index.ts +++ b/packages/server/modules/cross-server-sync/index.ts @@ -1,10 +1,5 @@ import { db } from '@/db/knex' import { moduleLogger, crossServerSyncLogger } from '@/logging/logging' -import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { - addCommentCreatedActivityFactory, - addReplyAddedActivityFactory -} from '@/modules/activitystream/services/commentActivity' import { getBlobsFactory } from '@/modules/blobstorage/repositories' import { getCommentFactory, @@ -71,7 +66,6 @@ import { ensureOnboardingProjectFactory } from '@/modules/cross-server-sync/serv import { downloadProjectFactory } from '@/modules/cross-server-sync/services/project' import { SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { getEventBus } from '@/modules/shared/services/eventBus' -import { publish } from '@/modules/shared/utils/subscriptions' const crossServerSyncModule: SpeckleModule = { init() { @@ -118,13 +112,7 @@ const crossServerSyncModule: SpeckleModule = { insertComments, insertCommentLinks, markCommentViewed, - emitEvent: getEventBus().emit, - addCommentCreatedActivity: addCommentCreatedActivityFactory({ - getViewerResourcesFromLegacyIdentifiers, - getViewerResourceItemsUngrouped, - saveActivity: saveActivityFactory({ db }), - publish - }) + emitEvent: getEventBus().emit }) const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({ getComment: getCommentFactory({ db }), @@ -133,13 +121,9 @@ const crossServerSyncModule: SpeckleModule = { insertCommentLinks, markCommentUpdated: markCommentUpdatedFactory({ db }), emitEvent: getEventBus().emit, - addReplyAddedActivity: addReplyAddedActivityFactory({ - getViewerResourcesForComment: getViewerResourcesForCommentFactory({ - getCommentsResources: getCommentsResourcesFactory({ db }), - getViewerResourcesFromLegacyIdentifiers - }), - saveActivity: saveActivityFactory({ db }), - publish + getViewerResourcesForComment: getViewerResourcesForCommentFactory({ + getCommentsResources: getCommentsResourcesFactory({ db }), + getViewerResourcesFromLegacyIdentifiers }) }) const getStreamBranchByName = getStreamBranchByNameFactory({ db }) diff --git a/packages/server/modules/shared/utils/subscriptions.ts b/packages/server/modules/shared/utils/subscriptions.ts index 2bbb825c6..5f2feea68 100644 --- a/packages/server/modules/shared/utils/subscriptions.ts +++ b/packages/server/modules/shared/utils/subscriptions.ts @@ -56,7 +56,7 @@ import { SubscriptionWorkspaceUpdatedArgs, WorkspaceUpdatedMessage } from '@/modules/core/graph/generated/graphql' -import { Merge } from 'type-fest' +import { Merge, OverrideProperties } from 'type-fest' import { ModelGraphQLReturn, ProjectGraphQLReturn, @@ -276,8 +276,10 @@ type SubscriptionTypeMap = { } [CommentSubscriptions.CommentThreadActivity]: { payload: { - commentThreadActivity: Partial & - Pick + commentThreadActivity: OverrideProperties< + CommentThreadActivityMessage, + { reply?: CommentRecord } + > streamId: string commentId: string } @@ -299,7 +301,7 @@ type SubscriptionTypeMap = { comment: CommentRecord } streamId: string - resourceIds: string[] + resourceIds: string } variables: SubscriptionCommentActivityArgs }