Merge pull request #3884 from specklesystems/fabians/web-2415-5

chore(server): refactor activityStream invocations - batch #5 - comments
This commit is contained in:
Kristaps Fabians Geikins
2025-02-17 15:15:10 +02:00
committed by GitHub
16 changed files with 425 additions and 405 deletions
@@ -253,26 +253,20 @@ export type AddCommitDeletedActivity = (params: {
branchId: string
}) => Promise<void>
export type AddCommentCreatedActivity = (params: {
streamId: string
userId: string
export type AddThreadCreatedActivity = (params: {
input: CommentCreatedActivityInput
comment: CommentRecord
}) => Promise<void>
export type AddCommentArchivedActivity = (params: {
streamId: string
commentId: string
userId: string
input: MutationCommentArchiveArgs
comment: CommentRecord
}) => Promise<void>
export type AddReplyAddedActivity = (params: {
streamId: string
input: ReplyCreatedActivityInput
reply: CommentRecord
userId: string
}) => Promise<void>
export type AddBranchCreatedActivity = (params: {
@@ -0,0 +1,120 @@
import {
AddThreadCreatedActivity,
AddReplyAddedActivity,
SaveActivity,
AddCommentArchivedActivity
} from '@/modules/activitystream/domain/operations'
import {
CommentCreatedActivityInput,
ReplyCreatedActivityInput
} from '@/modules/activitystream/domain/types'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { CommentEvents, CommentEventsPayloads } from '@/modules/comments/domain/events'
import { ReplyCreateInput } from '@/modules/core/graph/generated/graphql'
import { EventBusListen } from '@/modules/shared/services/eventBus'
import { has } from 'lodash'
import { OverrideProperties } from 'type-fest'
const addThreadCreatedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }): AddThreadCreatedActivity =>
async (params) => {
const { input, comment } = params
await saveActivity({
resourceId: comment.id,
streamId: comment.streamId,
resourceType: ResourceTypes.Comment,
actionType: ActionTypes.Comment.Create,
userId: comment.authorId,
info: { input },
message: `Comment added: ${comment.id} (${input})`
})
}
const isLegacyReplyCreateInput = (
i: ReplyCreatedActivityInput
): i is ReplyCreateInput => has(i, 'streamId')
const addReplyAddedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }): AddReplyAddedActivity =>
async (params) => {
const { input, reply } = params
const parentCommentId = isLegacyReplyCreateInput(input)
? input.parentComment
: input.threadId
await saveActivity({
streamId: reply.streamId,
resourceType: ResourceTypes.Comment,
resourceId: parentCommentId,
actionType: ActionTypes.Comment.Reply,
userId: reply.authorId,
info: { input },
message: `Comment reply #${reply.id} created`
})
}
const addCommentArchivedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }): AddCommentArchivedActivity =>
async (params) => {
const { userId, input, comment } = params
await saveActivity({
streamId: comment.streamId,
resourceType: ResourceTypes.Comment,
resourceId: comment.id,
actionType: ActionTypes.Comment.Archive,
userId,
info: { input },
message: `Comment #${comment.id} archived`
})
}
const isReplyCreatedPayload = (
payload: CommentEventsPayloads[typeof CommentEvents.Created]
): payload is OverrideProperties<
CommentEventsPayloads[typeof CommentEvents.Created],
{
input: ReplyCreatedActivityInput
}
> => {
return payload.isThread === false
}
const isThreadCreatedPayload = (
payload: CommentEventsPayloads[typeof CommentEvents.Created]
): payload is OverrideProperties<
CommentEventsPayloads[typeof CommentEvents.Created],
{
input: CommentCreatedActivityInput
}
> => {
return payload.isThread
}
export const reportCommentActivityFactory =
(deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => {
const addThreadCreatedActivity = addThreadCreatedActivityFactory(deps)
const addReplyAddedActivity = addReplyAddedActivityFactory(deps)
const addCommentArchivedActivity = addCommentArchivedActivityFactory(deps)
const quitters = [
deps.eventListen(CommentEvents.Created, async ({ payload }) => {
if (isReplyCreatedPayload(payload)) {
await addReplyAddedActivity({
reply: payload.comment,
input: payload.input
})
} else if (isThreadCreatedPayload(payload)) {
await addThreadCreatedActivity(payload)
}
}),
deps.eventListen(CommentEvents.Archived, async ({ payload }) => {
await addCommentArchivedActivity(payload)
})
]
return () => {
quitters.forEach((quit) => quit())
}
}
@@ -30,6 +30,7 @@ import { reportUserActivityFactory } from '@/modules/activitystream/events/userL
import { reportAccessRequestActivityFactory } from '@/modules/activitystream/events/accessRequestListeners'
import { reportBranchActivityFactory } from '@/modules/activitystream/events/branchListeners'
import { reportCommitActivityFactory } from '@/modules/activitystream/events/commitListeners'
import { reportCommentActivityFactory } from '@/modules/activitystream/events/commentListeners'
let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<() => void> = undefined
@@ -62,12 +63,17 @@ const initializeEventListeners = ({
eventListen: eventBus.listen,
saveActivity
})
const reportCommentActivity = reportCommentActivityFactory({
eventListen: eventBus.listen,
saveActivity
})
const quitCbs = [
reportUserActivity(),
reportAccessRequestActivity(),
reportBranchActivity(),
reportCommitActivity(),
reportCommentActivity(),
eventBus.listen(ServerInvitesEvents.Created, async ({ payload }) => {
if (!isProjectResourceTarget(payload.invite.resource)) return
await onServerInviteCreatedFactory({
@@ -1,200 +0,0 @@
import {
AddCommentArchivedActivity,
AddCommentCreatedActivity,
AddReplyAddedActivity,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import {
CommentCreatedActivityInput,
ReplyCreatedActivityInput
} from '@/modules/activitystream/domain/types'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import {
GetViewerResourceItemsUngrouped,
GetViewerResourcesForComment,
GetViewerResourcesFromLegacyIdentifiers
} from '@/modules/comments/domain/operations'
import { ViewerResourceItem } from '@/modules/comments/domain/types'
import {
CommentCreateInput,
ProjectCommentsUpdatedMessageType,
ReplyCreateInput
} from '@/modules/core/graph/generated/graphql'
import { PublishSubscription, pubsub } from '@/modules/shared/utils/subscriptions'
import {
CommentSubscriptions,
ProjectSubscriptions
} from '@/modules/shared/utils/subscriptions'
import { has } from 'lodash'
const isLegacyCommentCreateInput = (
i: CommentCreatedActivityInput
): i is CommentCreateInput => has(i, 'streamId')
export const addCommentCreatedActivityFactory =
({
getViewerResourceItemsUngrouped,
getViewerResourcesFromLegacyIdentifiers,
saveActivity,
publish
}: {
getViewerResourceItemsUngrouped: GetViewerResourceItemsUngrouped
getViewerResourcesFromLegacyIdentifiers: GetViewerResourcesFromLegacyIdentifiers
saveActivity: SaveActivity
publish: PublishSubscription
}): AddCommentCreatedActivity =>
async (params) => {
const { streamId, userId, input, comment } = params
let resourceIds: string
let resourceItems: ViewerResourceItem[]
if (isLegacyCommentCreateInput(input)) {
resourceIds = input.resources.map((res) => res?.resourceId).join(',')
const validResources = input.resources.filter(
(r): r is NonNullable<typeof r> => !!r
)
resourceItems = await getViewerResourcesFromLegacyIdentifiers(
streamId,
validResources
)
} else {
resourceItems =
input.resolvedResourceItems ||
(await getViewerResourceItemsUngrouped({
projectId: streamId,
resourceIdString: input.resourceIdString
}))
resourceIds = resourceItems.map((i) => i.versionId || i.objectId).join(',')
}
await Promise.all([
saveActivity({
resourceId: comment.id,
streamId,
resourceType: ResourceTypes.Comment,
actionType: ActionTypes.Comment.Create,
userId,
info: { input },
message: `Comment added: ${comment.id} (${input})`
}),
// @deprecated unused in FE2
pubsub.publish(CommentSubscriptions.CommentActivity, {
commentActivity: {
type: 'comment-added',
comment
},
streamId,
resourceIds
}),
publish(ProjectSubscriptions.ProjectCommentsUpdated, {
projectCommentsUpdated: {
id: comment.id,
type: ProjectCommentsUpdatedMessageType.Created,
comment
},
projectId: streamId,
resourceItems
})
])
}
/**
* Add comment archived/unarchived activity
*/
export const addCommentArchivedActivityFactory =
({
getViewerResourcesForComment,
saveActivity,
publish
}: {
getViewerResourcesForComment: GetViewerResourcesForComment
publish: PublishSubscription
saveActivity: SaveActivity
}): AddCommentArchivedActivity =>
async (params) => {
const { streamId, commentId, userId, input, comment } = params
const isArchiving = !!input.archived
await Promise.all([
saveActivity({
streamId,
resourceType: ResourceTypes.Comment,
resourceId: commentId,
actionType: ActionTypes.Comment.Archive,
userId,
info: { input },
message: `Comment #${commentId} archived`
}),
// @deprecated not used in FE2
pubsub.publish(CommentSubscriptions.CommentThreadActivity, {
commentThreadActivity: {
type: isArchiving ? 'comment-archived' : 'comment-added'
},
streamId: input.streamId,
commentId: input.commentId
}),
publish(ProjectSubscriptions.ProjectCommentsUpdated, {
projectCommentsUpdated: {
id: commentId,
type: isArchiving
? ProjectCommentsUpdatedMessageType.Archived
: ProjectCommentsUpdatedMessageType.Created,
comment: isArchiving ? null : comment
},
projectId: streamId,
resourceItems: await getViewerResourcesForComment(streamId, comment.id)
})
])
}
const isLegacyReplyCreateInput = (
i: ReplyCreatedActivityInput
): i is ReplyCreateInput => has(i, 'streamId')
export const addReplyAddedActivityFactory =
({
getViewerResourcesForComment,
saveActivity,
publish
}: {
getViewerResourcesForComment: GetViewerResourcesForComment
publish: PublishSubscription
saveActivity: SaveActivity
}): AddReplyAddedActivity =>
async (params) => {
const { streamId, input, reply, userId } = params
const parentCommentId = isLegacyReplyCreateInput(input)
? input.parentComment
: input.threadId
await Promise.all([
saveActivity({
streamId,
resourceType: ResourceTypes.Comment,
resourceId: parentCommentId,
actionType: ActionTypes.Comment.Reply,
userId,
info: { input },
message: `Comment reply #${reply.id} created`
}),
// @deprecated
pubsub.publish(CommentSubscriptions.CommentThreadActivity, {
commentThreadActivity: {
type: 'reply-added',
reply
},
streamId,
commentId: parentCommentId
}),
publish(ProjectSubscriptions.ProjectCommentsUpdated, {
projectCommentsUpdated: {
id: reply.id,
type: ProjectCommentsUpdatedMessageType.Created,
comment: reply
},
projectId: streamId,
resourceItems: await getViewerResourcesForComment(streamId, reply.id)
})
])
}
@@ -45,19 +45,13 @@ import {
markCommentUpdatedFactory,
markCommentViewedFactory
} from '@/modules/comments/repositories/comments'
import {
addCommentCreatedActivityFactory,
addReplyAddedActivityFactory
} from '@/modules/activitystream/services/commentActivity'
import { validateInputAttachmentsFactory } from '@/modules/comments/services/commentTextService'
import { getBlobsFactory } from '@/modules/blobstorage/repositories'
import { createCommitByBranchIdFactory } from '@/modules/core/services/commit/management'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { publish } from '@/modules/shared/utils/subscriptions'
import { getUserFactory } from '@/modules/core/repositories/users'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { db, mainDb } from '@/db/knex'
import { db } from '@/db/knex'
import { getEventBus } from '@/modules/shared/services/eventBus'
const command: CommandModule<
@@ -141,13 +135,7 @@ const command: CommandModule<
insertComments,
insertCommentLinks,
markCommentViewed,
emitEvent: getEventBus().emit,
addCommentCreatedActivity: addCommentCreatedActivityFactory({
getViewerResourcesFromLegacyIdentifiers,
getViewerResourceItemsUngrouped,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})
emitEvent: getEventBus().emit
})
const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({
@@ -157,13 +145,9 @@ const command: CommandModule<
insertCommentLinks,
markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }),
emitEvent: getEventBus().emit,
addReplyAddedActivity: addReplyAddedActivityFactory({
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
}),
saveActivity: saveActivityFactory({ db: mainDb }),
publish
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
})
})
@@ -26,10 +26,6 @@ import {
createCommentThreadAndNotifyFactory
} from '@/modules/comments/services/management'
import { createBranchAndNotifyFactory } from '@/modules/core/services/branch/management'
import {
addCommentCreatedActivityFactory,
addReplyAddedActivityFactory
} from '@/modules/activitystream/services/commentActivity'
import {
createCommitFactory,
getAllBranchCommitsFactory,
@@ -45,7 +41,7 @@ import {
getViewerResourcesForCommentsFactory,
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
import { db, mainDb } from '@/db/knex'
import { db } from '@/db/knex'
import {
getCommentFactory,
getCommentsResourcesFactory,
@@ -56,8 +52,6 @@ import {
} from '@/modules/comments/repositories/comments'
import { getBlobsFactory } from '@/modules/blobstorage/repositories'
import { validateInputAttachmentsFactory } from '@/modules/comments/services/commentTextService'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { publish } from '@/modules/shared/utils/subscriptions'
import { getUserFactory } from '@/modules/core/repositories/users'
import { createObjectFactory } from '@/modules/core/services/objects/management'
import { authorizeResolver } from '@/modules/shared'
@@ -166,13 +160,7 @@ const command: CommandModule<
insertComments,
insertCommentLinks,
markCommentViewed,
emitEvent: getEventBus().emit,
addCommentCreatedActivity: addCommentCreatedActivityFactory({
getViewerResourcesFromLegacyIdentifiers,
getViewerResourceItemsUngrouped,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})
emitEvent: getEventBus().emit
})
const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({
getComment: getCommentFactory({ db: projectDb }),
@@ -181,13 +169,9 @@ const command: CommandModule<
insertCommentLinks,
markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }),
emitEvent: getEventBus().emit,
addReplyAddedActivity: addReplyAddedActivityFactory({
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
}),
saveActivity: saveActivityFactory({ db: mainDb }),
publish
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
})
})
@@ -1,16 +1,33 @@
import {
CommentCreatedActivityInput,
ReplyCreatedActivityInput
} from '@/modules/activitystream/domain/types'
import { ViewerResourceItem } from '@/modules/comments/domain/types'
import { CommentRecord } from '@/modules/comments/helpers/types'
import { MutationCommentArchiveArgs } from '@/modules/core/graph/generated/graphql'
export const commentEventsNamespace = 'comments' as const
export const CommentEvents = {
Created: `${commentEventsNamespace}.created`,
Updated: `${commentEventsNamespace}.updated`
Updated: `${commentEventsNamespace}.updated`,
Archived: `${commentEventsNamespace}.archived`
} as const
export type CommentEventsPayloads = {
[CommentEvents.Created]: { comment: CommentRecord }
[CommentEvents.Created]: {
comment: CommentRecord
isThread: boolean
input: CommentCreatedActivityInput | ReplyCreatedActivityInput
resourceItems: ViewerResourceItem[]
}
[CommentEvents.Updated]: {
previousComment: CommentRecord
newComment: CommentRecord
}
[CommentEvents.Archived]: {
userId: string
input: MutationCommentArchiveArgs
comment: CommentRecord
}
}
@@ -0,0 +1,90 @@
import { CommentEvents } from '@/modules/comments/domain/events'
import { GetViewerResourcesForComment } from '@/modules/comments/domain/operations'
import { ProjectCommentsUpdatedMessageType } from '@/modules/core/graph/generated/graphql'
import { DependenciesOf } from '@/modules/shared/helpers/factory'
import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus'
import {
CommentSubscriptions,
ProjectSubscriptions,
PublishSubscription
} from '@/modules/shared/utils/subscriptions'
const reportCommentCreatedFactory =
(deps: { publish: PublishSubscription }) =>
async (payload: EventPayload<typeof CommentEvents.Created>) => {
const { comment, resourceItems } = payload.payload
await Promise.all([
// @deprecated unused in FE2
deps.publish(CommentSubscriptions.CommentActivity, {
commentActivity: {
type: 'comment-added',
comment
},
streamId: comment.streamId,
resourceIds: resourceItems.map((i) => i.versionId || i.objectId).join(',')
}),
deps.publish(ProjectSubscriptions.ProjectCommentsUpdated, {
projectCommentsUpdated: {
id: comment.id,
type: ProjectCommentsUpdatedMessageType.Created,
comment
},
projectId: comment.streamId,
resourceItems
})
])
}
const reportCommentArchivedFactory =
(deps: {
publish: PublishSubscription
getViewerResourcesForComment: GetViewerResourcesForComment
}) =>
async (payload: EventPayload<typeof CommentEvents.Archived>) => {
const {
comment,
input: { archived, streamId }
} = payload.payload
await Promise.all([
deps.publish(CommentSubscriptions.CommentThreadActivity, {
commentThreadActivity: {
type: archived ? 'comment-archived' : 'comment-added'
},
streamId,
commentId: comment.id
}),
deps.publish(ProjectSubscriptions.ProjectCommentsUpdated, {
projectCommentsUpdated: {
id: comment.id,
type: archived
? ProjectCommentsUpdatedMessageType.Archived
: ProjectCommentsUpdatedMessageType.Created,
comment: archived ? null : comment
},
projectId: streamId,
resourceItems: await deps.getViewerResourcesForComment(streamId, comment.id)
})
])
}
export const reportSubscriptionEventsFactory =
(
deps: {
eventListen: EventBusListen
publish: PublishSubscription
} & DependenciesOf<typeof reportCommentCreatedFactory> &
DependenciesOf<typeof reportCommentArchivedFactory>
) =>
() => {
const reportCommentCreated = reportCommentCreatedFactory(deps)
const reportCommentArchived = reportCommentArchivedFactory(deps)
const quitters = [
deps.eventListen(CommentEvents.Created, reportCommentCreated),
deps.eventListen(CommentEvents.Archived, reportCommentArchived)
]
return () => quitters.forEach((q) => q())
}
@@ -49,11 +49,6 @@ import {
filteredSubscribe,
ProjectSubscriptions
} from '@/modules/shared/utils/subscriptions'
import {
addCommentArchivedActivityFactory,
addCommentCreatedActivityFactory,
addReplyAddedActivityFactory
} from '@/modules/activitystream/services/commentActivity'
import {
doViewerResourcesFit,
getViewerResourcesForCommentFactory,
@@ -95,7 +90,6 @@ import {
} from '@/modules/core/repositories/branches'
import { getStreamObjectsFactory } from '@/modules/core/repositories/objects'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { Knex } from 'knex'
import { getEventBus } from '@/modules/shared/services/eventBus'
@@ -533,8 +527,6 @@ export = {
const getViewerResourceItemsUngrouped = buildGetViewerResourceItemsUngrouped({
db: projectDb
})
const getViewerResourcesFromLegacyIdentifiers =
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
const validateInputAttachments = validateInputAttachmentsFactory({
getBlobs: getBlobsFactory({ db: projectDb })
@@ -549,13 +541,7 @@ export = {
insertComments,
insertCommentLinks,
markCommentViewed,
emitEvent: getEventBus().emit,
addCommentCreatedActivity: addCommentCreatedActivityFactory({
getViewerResourcesFromLegacyIdentifiers,
getViewerResourceItemsUngrouped,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})
emitEvent: getEventBus().emit
})
return await createCommentThreadAndNotify(args.input, ctx.userId!)
@@ -588,13 +574,9 @@ export = {
insertCommentLinks,
markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }),
emitEvent: getEventBus().emit,
addReplyAddedActivity: addReplyAddedActivityFactory({
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
}),
saveActivity: saveActivityFactory({ db: mainDb }),
publish
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
})
})
@@ -656,11 +638,8 @@ export = {
getComment,
getStream,
updateComment,
addCommentArchivedActivity: addCommentArchivedActivityFactory({
getViewerResourcesForComment,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})
getViewerResourcesForComment,
emitEvent: getEventBus().emit
})
await archiveCommentAndNotify(
@@ -740,6 +719,8 @@ export = {
throw new ForbiddenError('You are not authorized.')
const projectDb = await getProjectDbClient({ projectId: args.input.streamId })
const getViewerResourcesFromLegacyIdentifiers =
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
const createComment = createCommentFactory({
checkStreamResourcesAccess: streamResourceCheckFactory({
@@ -752,31 +733,14 @@ export = {
insertCommentLinks: insertCommentLinksFactory({ db: projectDb }),
deleteComment: deleteCommentFactory({ db: projectDb }),
markCommentViewed: markCommentViewedFactory({ db: projectDb }),
emitEvent: getEventBus().emit
emitEvent: getEventBus().emit,
getViewerResourcesFromLegacyIdentifiers
})
const comment = await createComment({
userId: context.userId,
input: args.input
})
const getViewerResourceItemsUngrouped = buildGetViewerResourceItemsUngrouped({
db: projectDb
})
const getViewerResourcesFromLegacyIdentifiers =
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
await addCommentCreatedActivityFactory({
getViewerResourceItemsUngrouped,
getViewerResourcesFromLegacyIdentifiers,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})({
streamId: args.input.streamId,
userId: context.userId,
input: args.input,
comment
})
return comment.id
},
@@ -828,27 +792,10 @@ export = {
const archiveComment = archiveCommentFactory({
getComment: getCommentFactory({ db: projectDb }),
getStream,
updateComment: updateCommentFactory({ db: projectDb })
})
const updatedComment = await archiveComment({ ...args, userId: context.userId! }) // NOTE: permissions check inside service
const getViewerResourcesFromLegacyIdentifiers =
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
const getViewerResourcesForComment = getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
})
await addCommentArchivedActivityFactory({
getViewerResourcesForComment,
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})({
streamId: args.streamId,
commentId: args.commentId,
userId: context.userId!,
input: args,
comment: updatedComment
updateComment: updateCommentFactory({ db: projectDb }),
emitEvent: getEventBus().emit
})
await archiveComment({ ...args, userId: context.userId! }) // NOTE: permissions check inside service
return true
},
@@ -878,7 +825,12 @@ export = {
}),
deleteComment: deleteCommentFactory({ db: projectDb }),
markCommentUpdated: markCommentUpdatedFactory({ db: projectDb }),
emitEvent: getEventBus().emit
emitEvent: getEventBus().emit,
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers:
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
})
})
const reply = await createCommentReply({
authorId: context.userId,
@@ -889,22 +841,6 @@ export = {
blobIds: args.input.blobIds
})
const getViewerResourcesFromLegacyIdentifiers =
buildGetViewerResourcesFromLegacyIdentifiers({ db: projectDb })
await addReplyAddedActivityFactory({
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db: projectDb }),
getViewerResourcesFromLegacyIdentifiers
}),
saveActivity: saveActivityFactory({ db: mainDb }),
publish
})({
streamId: args.input.streamId,
input: args.input,
reply,
userId: context.userId
})
return reply.id
}
},
+32
View File
@@ -2,10 +2,20 @@ import { db } from '@/db/knex'
import { moduleLogger } from '@/logging/logging'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { addStreamCommentMentionActivityFactory } from '@/modules/activitystream/services/streamActivity'
import { reportSubscriptionEventsFactory } from '@/modules/comments/events/subscriptionListeners'
import { getCommentsResourcesFactory } from '@/modules/comments/repositories/comments'
import { notifyUsersOnCommentEventsFactory } from '@/modules/comments/services/notifications'
import { getCommitsAndTheirBranchIdsFactory } from '@/modules/core/repositories/commits'
import { getStreamObjectsFactory } from '@/modules/core/repositories/objects'
import {
getViewerResourcesForCommentFactory,
getViewerResourcesForCommentsFactory,
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
import { publishNotification } from '@/modules/notifications/services/publication'
import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
let unsubFromEvents: Optional<() => void> = undefined
@@ -22,6 +32,28 @@ const commentsModule: SpeckleModule = {
})
})
unsubFromEvents = await notifyUsersOnCommentEvents()
const getViewerResourcesFromLegacyIdentifiers =
getViewerResourcesFromLegacyIdentifiersFactory({
getViewerResourcesForComments: getViewerResourcesForCommentsFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers: (...args) =>
getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep
}),
getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }),
getStreamObjects: getStreamObjectsFactory({ db })
})
const getViewerResourcesForComment = getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers: (...args) =>
getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep
})
reportSubscriptionEventsFactory({
eventListen: getEventBus().listen,
publish,
getViewerResourcesForComment
})()
}
},
async finalize() {},
@@ -14,6 +14,8 @@ import {
CheckStreamResourcesAccess,
DeleteComment,
GetComment,
GetViewerResourcesForComment,
GetViewerResourcesFromLegacyIdentifiers,
InsertCommentLinks,
InsertComments,
MarkCommentUpdated,
@@ -58,6 +60,7 @@ export const createCommentFactory =
deleteComment: DeleteComment
markCommentViewed: MarkCommentViewed
emitEvent: EventBusEmit
getViewerResourcesFromLegacyIdentifiers: GetViewerResourcesFromLegacyIdentifiers
}) =>
async ({ userId, input }: { userId: string; input: CommentCreateInput }) => {
if (input.resources.length < 1)
@@ -123,10 +126,17 @@ export const createCommentFactory =
await deps.markCommentViewed(id, userId) // so we don't self mark a comment as unread the moment it's created
const resourceItems = await deps.getViewerResourcesFromLegacyIdentifiers(
input.streamId,
input.resources.filter(isNonNullable)
)
await deps.emitEvent({
eventName: CommentEvents.Created,
payload: {
comment: newComment
comment: newComment,
input,
isThread: true,
resourceItems
}
})
@@ -145,6 +155,7 @@ export const createCommentReplyFactory =
deleteComment: DeleteComment
markCommentUpdated: MarkCommentUpdated
emitEvent: EventBusEmit
getViewerResourcesForComment: GetViewerResourcesForComment
}) =>
async ({
authorId,
@@ -195,10 +206,24 @@ export const createCommentReplyFactory =
await deps.markCommentUpdated(parentCommentId)
const resourceItems = await deps.getViewerResourcesForComment(
newComment.streamId,
newComment.id
)
await deps.emitEvent({
eventName: CommentEvents.Created,
payload: {
comment: newComment
comment: newComment,
isThread: false,
input: {
threadId: parentCommentId,
projectId: streamId,
content: {
blobIds,
doc: text
}
},
resourceItems
}
})
@@ -256,6 +281,7 @@ export const archiveCommentFactory =
getComment: GetComment
getStream: GetStream
updateComment: UpdateComment
emitEvent: EventBusEmit
}) =>
async ({
commentId,
@@ -282,5 +308,15 @@ export const archiveCommentFactory =
}
const updatedComment = await deps.updateComment(commentId, { archived })
await deps.emitEvent({
eventName: CommentEvents.Archived,
payload: {
userId,
input: { archived, commentId, streamId },
comment: updatedComment!
}
})
return updatedComment!
}
@@ -26,6 +26,7 @@ import {
EditCommentAndNotify,
GetComment,
GetViewerResourceItemsUngrouped,
GetViewerResourcesForComment,
InsertCommentLinks,
InsertCommentPayload,
InsertComments,
@@ -35,11 +36,6 @@ import {
ValidateInputAttachments
} from '@/modules/comments/domain/operations'
import { GetStream } from '@/modules/core/domain/streams/operations'
import {
AddCommentArchivedActivity,
AddCommentCreatedActivity,
AddReplyAddedActivity
} from '@/modules/activitystream/domain/operations'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { CommentEvents } from '@/modules/comments/domain/events'
@@ -118,7 +114,6 @@ export const createCommentThreadAndNotifyFactory =
insertCommentLinks: InsertCommentLinks
markCommentViewed: MarkCommentViewed
emitEvent: EventBusEmit
addCommentCreatedActivity: AddCommentCreatedActivity
}): CreateCommentThreadAndNotify =>
async (input: CreateCommentInput, userId: string) => {
const [resources] = await Promise.all([
@@ -180,17 +175,11 @@ export const createCommentThreadAndNotifyFactory =
deps.emitEvent({
eventName: CommentEvents.Created,
payload: {
comment
comment,
input,
isThread: true,
resourceItems: resources
}
}),
deps.addCommentCreatedActivity({
streamId: input.projectId,
userId,
input: {
...input,
resolvedResourceItems: resources
},
comment
})
])
@@ -205,7 +194,7 @@ export const createCommentReplyAndNotifyFactory =
insertCommentLinks: InsertCommentLinks
markCommentUpdated: MarkCommentUpdated
emitEvent: EventBusEmit
addReplyAddedActivity: AddReplyAddedActivity
getViewerResourcesForComment: GetViewerResourcesForComment
}): CreateCommentReplyAndNotify =>
async (input: CreateCommentReplyInput, userId: string) => {
const thread = await deps.getComment({ id: input.threadId, userId })
@@ -239,19 +228,20 @@ export const createCommentReplyAndNotifyFactory =
}
// Mark parent comment updated and emit events
const resourceItems = await deps.getViewerResourcesForComment(
reply.streamId,
reply.id
)
await Promise.all([
deps.markCommentUpdated(thread.id),
deps.emitEvent({
eventName: CommentEvents.Created,
payload: {
comment: reply
comment: reply,
input,
isThread: false,
resourceItems
}
}),
deps.addReplyAddedActivity({
streamId: thread.streamId,
input,
reply,
userId
})
])
@@ -298,7 +288,8 @@ export const archiveCommentAndNotifyFactory =
getComment: GetComment
getStream: GetStream
updateComment: UpdateComment
addCommentArchivedActivity: AddCommentArchivedActivity
emitEvent: EventBusEmit
getViewerResourcesForComment: GetViewerResourcesForComment
}): ArchiveCommentAndNotify =>
async (commentId: string, userId: string, archived = true) => {
const comment = await deps.getComment({ id: commentId, userId })
@@ -321,16 +312,13 @@ export const archiveCommentAndNotifyFactory =
archived
})
await deps.addCommentArchivedActivity({
streamId: stream.id,
commentId,
userId,
input: {
archived,
streamId: stream.id,
commentId
},
comment: updatedComment!
await deps.emitEvent({
eventName: CommentEvents.Archived,
payload: {
userId,
input: { archived, commentId, streamId: stream.id },
comment: updatedComment!
}
})
return updatedComment
@@ -22,7 +22,8 @@ const {
markCommentViewedFactory,
insertCommentsFactory,
insertCommentLinksFactory,
deleteCommentFactory
deleteCommentFactory,
getCommentsResourcesFactory
} = require('@/modules/comments/repositories/comments')
const { db } = require('@/db/knex')
const {
@@ -36,7 +37,8 @@ const {
const {
createCommitFactory,
insertStreamCommitsFactory,
insertBranchCommitsFactory
insertBranchCommitsFactory,
getCommitsAndTheirBranchIdsFactory
} = require('@/modules/core/repositories/commits')
const {
getBranchByIdFactory,
@@ -53,7 +55,8 @@ const {
} = require('@/modules/core/repositories/streams')
const {
getObjectFactory,
storeSingleObjectIfNotFoundFactory
storeSingleObjectIfNotFoundFactory,
getStreamObjectsFactory
} = require('@/modules/core/repositories/objects')
const {
legacyCreateStreamFactory,
@@ -108,6 +111,10 @@ const {
} = require('@/modules/serverinvites/services/processing')
const { getServerInfoFactory } = require('@/modules/core/repositories/server')
const { createObjectFactory } = require('@/modules/core/services/objects/management')
const {
getViewerResourcesFromLegacyIdentifiersFactory,
getViewerResourcesForCommentsFactory
} = require('@/modules/core/services/commit/viewerResources')
const getServerInfo = getServerInfoFactory({ db })
const getUser = getUserFactory({ db })
@@ -117,6 +124,18 @@ const streamResourceCheck = streamResourceCheckFactory({
checkStreamResourceAccess: checkStreamResourceAccessFactory({ db })
})
const markCommentViewed = markCommentViewedFactory({ db })
const getViewerResourcesFromLegacyIdentifiers =
getViewerResourcesFromLegacyIdentifiersFactory({
getViewerResourcesForComments: getViewerResourcesForCommentsFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers: (...args) =>
getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep
}),
getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }),
getStreamObjects: getStreamObjectsFactory({ db })
})
const createComment = createCommentFactory({
checkStreamResourcesAccess: streamResourceCheck,
validateInputAttachments: validateInputAttachmentsFactory({
@@ -126,7 +145,8 @@ const createComment = createCommentFactory({
insertCommentLinks: insertCommentLinksFactory({ db }),
deleteComment: deleteCommentFactory({ db }),
markCommentViewed,
emitEvent: getEventBus().emit
emitEvent: getEventBus().emit,
getViewerResourcesFromLegacyIdentifiers
})
const getObject = getObjectFactory({ db })
@@ -43,7 +43,8 @@ import {
updateCommentFactory,
getCommentsLegacyFactory,
getResourceCommentCountFactory,
getStreamCommentCountFactory
getStreamCommentCountFactory,
getCommentsResourcesFactory
} from '@/modules/comments/repositories/comments'
import { db } from '@/db/knex'
import { getBlobsFactory } from '@/modules/blobstorage/repositories'
@@ -59,7 +60,8 @@ import {
import {
createCommitFactory,
insertStreamCommitsFactory,
insertBranchCommitsFactory
insertBranchCommitsFactory,
getCommitsAndTheirBranchIdsFactory
} from '@/modules/core/repositories/commits'
import {
getBranchByIdFactory,
@@ -69,7 +71,8 @@ import {
} from '@/modules/core/repositories/branches'
import {
getObjectFactory,
storeSingleObjectIfNotFoundFactory
storeSingleObjectIfNotFoundFactory,
getStreamObjectsFactory
} from '@/modules/core/repositories/objects'
import {
legacyCreateStreamFactory,
@@ -117,6 +120,11 @@ import {
import { CommentRecord } from '@/modules/comments/helpers/types'
import { MaybeNullOrUndefined } from '@speckle/shared'
import { CommentEvents } from '@/modules/comments/domain/events'
import {
getViewerResourcesForCommentFactory,
getViewerResourcesForCommentsFactory,
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
type LegacyCommentRecord = CommentRecord & {
total_count: string
@@ -137,6 +145,17 @@ const validateInputAttachments = validateInputAttachmentsFactory({
const insertComments = insertCommentsFactory({ db })
const insertCommentLinks = insertCommentLinksFactory({ db })
const deleteComment = deleteCommentFactory({ db })
const getViewerResourcesFromLegacyIdentifiers =
getViewerResourcesFromLegacyIdentifiersFactory({
getViewerResourcesForComments: getViewerResourcesForCommentsFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers: (...args) =>
getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep
}),
getCommitsAndTheirBranchIds: getCommitsAndTheirBranchIdsFactory({ db }),
getStreamObjects: getStreamObjectsFactory({ db })
})
const createComment = createCommentFactory({
checkStreamResourcesAccess: streamResourceCheck,
validateInputAttachments,
@@ -144,7 +163,13 @@ const createComment = createCommentFactory({
insertCommentLinks,
deleteComment,
markCommentViewed,
emitEvent: getEventBus().emit
emitEvent: getEventBus().emit,
getViewerResourcesFromLegacyIdentifiers
})
const getViewerResourcesForComment = getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers: (...args) =>
getViewerResourcesFromLegacyIdentifiers(...args) // recursive dep
})
const createCommentReply = createCommentReplyFactory({
validateInputAttachments,
@@ -153,7 +178,8 @@ const createCommentReply = createCommentReplyFactory({
checkStreamResourcesAccess: streamResourceCheck,
deleteComment,
markCommentUpdated: markCommentUpdatedFactory({ db }),
emitEvent: getEventBus().emit
emitEvent: getEventBus().emit,
getViewerResourcesForComment
})
const getComment = getCommentFactory({ db })
const updateComment = updateCommentFactory({ db })
@@ -166,7 +192,8 @@ const editComment = editCommentFactory({
const archiveComment = archiveCommentFactory({
getComment,
getStream,
updateComment
updateComment,
emitEvent: getEventBus().emit
})
const getComments = getCommentsLegacyFactory({ db })
const getResourceCommentCount = getResourceCommentCountFactory({ db })
@@ -1,10 +1,5 @@
import { db } from '@/db/knex'
import { moduleLogger, crossServerSyncLogger } from '@/logging/logging'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import {
addCommentCreatedActivityFactory,
addReplyAddedActivityFactory
} from '@/modules/activitystream/services/commentActivity'
import { getBlobsFactory } from '@/modules/blobstorage/repositories'
import {
getCommentFactory,
@@ -71,7 +66,6 @@ import { ensureOnboardingProjectFactory } from '@/modules/cross-server-sync/serv
import { downloadProjectFactory } from '@/modules/cross-server-sync/services/project'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
const crossServerSyncModule: SpeckleModule = {
init() {
@@ -118,13 +112,7 @@ const crossServerSyncModule: SpeckleModule = {
insertComments,
insertCommentLinks,
markCommentViewed,
emitEvent: getEventBus().emit,
addCommentCreatedActivity: addCommentCreatedActivityFactory({
getViewerResourcesFromLegacyIdentifiers,
getViewerResourceItemsUngrouped,
saveActivity: saveActivityFactory({ db }),
publish
})
emitEvent: getEventBus().emit
})
const createCommentReplyAndNotify = createCommentReplyAndNotifyFactory({
getComment: getCommentFactory({ db }),
@@ -133,13 +121,9 @@ const crossServerSyncModule: SpeckleModule = {
insertCommentLinks,
markCommentUpdated: markCommentUpdatedFactory({ db }),
emitEvent: getEventBus().emit,
addReplyAddedActivity: addReplyAddedActivityFactory({
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers
}),
saveActivity: saveActivityFactory({ db }),
publish
getViewerResourcesForComment: getViewerResourcesForCommentFactory({
getCommentsResources: getCommentsResourcesFactory({ db }),
getViewerResourcesFromLegacyIdentifiers
})
})
const getStreamBranchByName = getStreamBranchByNameFactory({ db })
@@ -56,7 +56,7 @@ import {
SubscriptionWorkspaceUpdatedArgs,
WorkspaceUpdatedMessage
} from '@/modules/core/graph/generated/graphql'
import { Merge } from 'type-fest'
import { Merge, OverrideProperties } from 'type-fest'
import {
ModelGraphQLReturn,
ProjectGraphQLReturn,
@@ -276,8 +276,10 @@ type SubscriptionTypeMap = {
}
[CommentSubscriptions.CommentThreadActivity]: {
payload: {
commentThreadActivity: Partial<CommentThreadActivityMessage> &
Pick<CommentThreadActivityMessage, 'type'>
commentThreadActivity: OverrideProperties<
CommentThreadActivityMessage,
{ reply?: CommentRecord }
>
streamId: string
commentId: string
}
@@ -299,7 +301,7 @@ type SubscriptionTypeMap = {
comment: CommentRecord
}
streamId: string
resourceIds: string[]
resourceIds: string
}
variables: SubscriptionCommentActivityArgs
}