fix(notifications): revert notifications except migration (#5671)

This commit is contained in:
Daniel Gak Anagrov
2025-10-06 14:48:10 +02:00
committed by GitHub
parent 476239b2a5
commit e8c960fc30
64 changed files with 302 additions and 2494 deletions
@@ -2064,7 +2064,6 @@ export type Mutation = {
*/
inviteResend: Scalars['Boolean']['output'];
modelMutations: ModelMutations;
notificationMutations: NotificationMutations;
/** @deprecated Part of the old API surface and will be removed in the future. */
objectCreate: Array<Scalars['String']['output']>;
projectMutations: ProjectMutations;
@@ -4978,8 +4977,6 @@ export type User = {
meta: UserMeta;
name: Scalars['String']['output'];
notificationPreferences: Scalars['JSONObject']['output'];
/** List all notifications for the user */
notifications: UserNotificationCollection;
permissions: RootPermissionChecks;
profiles?: Maybe<Scalars['JSONObject']['output']>;
/** Get pending project access request, that the user made */
@@ -5065,16 +5062,6 @@ export type UserFavoriteStreamsArgs = {
};
/**
* Full user type, should only be used in the context of admin operations or
* when a user is reading/writing info about himself
*/
export type UserNotificationsArgs = {
cursor?: InputMaybe<Scalars['String']['input']>;
limit?: InputMaybe<Scalars['Int']['input']>;
};
/**
* Full user type, should only be used in the context of admin operations or
* when a user is reading/writing info about himself
@@ -5265,13 +5252,6 @@ export type UserMetaMutationsSetSpeckleConBannerDismissedArgs = {
value: Scalars['Boolean']['input'];
};
export type UserNotificationCollection = {
__typename?: 'UserNotificationCollection';
cursor?: Maybe<Scalars['String']['output']>;
items: Array<Notification>;
totalCount: Scalars['Int']['output'];
};
export type UserProjectCollection = {
__typename?: 'UserProjectCollection';
cursor?: Maybe<Scalars['String']['output']>;
@@ -9523,8 +9503,6 @@ export type AllObjectTypes = {
ModelsTreeItem: ModelsTreeItem,
ModelsTreeItemCollection: ModelsTreeItemCollection,
Mutation: Mutation,
Notification: Notification,
NotificationMutations: NotificationMutations,
Object: Object,
ObjectCollection: ObjectCollection,
PasswordStrengthCheckFeedback: PasswordStrengthCheckFeedback,
@@ -9605,7 +9583,6 @@ export type AllObjectTypes = {
UserGendoAICredits: UserGendoAiCredits,
UserMeta: UserMeta,
UserMetaMutations: UserMetaMutations,
UserNotificationCollection: UserNotificationCollection,
UserProjectCollection: UserProjectCollection,
UserProjectsUpdatedMessage: UserProjectsUpdatedMessage,
UserSearchResultCollection: UserSearchResultCollection,
@@ -10355,7 +10332,6 @@ export type MutationFieldArgs = {
inviteDelete: MutationInviteDeleteArgs,
inviteResend: MutationInviteResendArgs,
modelMutations: {},
notificationMutations: {},
objectCreate: MutationObjectCreateArgs,
projectMutations: {},
requestVerification: {},
@@ -10391,18 +10367,6 @@ export type MutationFieldArgs = {
workspaceJoinRequestMutations: {},
workspaceMutations: {},
}
export type NotificationFieldArgs = {
createdAt: {},
id: {},
payload: {},
read: {},
type: {},
updatedAt: {},
}
export type NotificationMutationsFieldArgs = {
bulkDelete: NotificationMutationsBulkDeleteArgs,
bulkUpdate: NotificationMutationsBulkUpdateArgs,
}
export type ObjectFieldArgs = {
applicationId: {},
children: ObjectChildrenArgs,
@@ -11055,7 +11019,6 @@ export type UserFieldArgs = {
meta: {},
name: {},
notificationPreferences: {},
notifications: UserNotificationsArgs,
permissions: {},
profiles: {},
projectAccessRequest: UserProjectAccessRequestArgs,
@@ -11110,11 +11073,6 @@ export type UserMetaMutationsFieldArgs = {
setSpeckleCon25BannerDismissed: UserMetaMutationsSetSpeckleCon25BannerDismissedArgs,
setSpeckleConBannerDismissed: UserMetaMutationsSetSpeckleConBannerDismissedArgs,
}
export type UserNotificationCollectionFieldArgs = {
cursor: {},
items: {},
totalCount: {},
}
export type UserProjectCollectionFieldArgs = {
cursor: {},
items: {},
@@ -11530,8 +11488,6 @@ export type AllObjectFieldArgTypes = {
ModelsTreeItem: ModelsTreeItemFieldArgs,
ModelsTreeItemCollection: ModelsTreeItemCollectionFieldArgs,
Mutation: MutationFieldArgs,
Notification: NotificationFieldArgs,
NotificationMutations: NotificationMutationsFieldArgs,
Object: ObjectFieldArgs,
ObjectCollection: ObjectCollectionFieldArgs,
PasswordStrengthCheckFeedback: PasswordStrengthCheckFeedbackFieldArgs,
@@ -11612,7 +11568,6 @@ export type AllObjectFieldArgTypes = {
UserGendoAICredits: UserGendoAiCreditsFieldArgs,
UserMeta: UserMetaFieldArgs,
UserMetaMutations: UserMetaMutationsFieldArgs,
UserNotificationCollection: UserNotificationCollectionFieldArgs,
UserProjectCollection: UserProjectCollectionFieldArgs,
UserProjectsUpdatedMessage: UserProjectsUpdatedMessageFieldArgs,
UserSearchResultCollection: UserSearchResultCollectionFieldArgs,
@@ -11663,4 +11618,3 @@ export type AllObjectFieldArgTypes = {
WorkspaceTeamByRole: WorkspaceTeamByRoleFieldArgs,
WorkspaceUpdatedMessage: WorkspaceUpdatedMessageFieldArgs,
}
@@ -1,48 +0,0 @@
type UserNotificationCollection {
totalCount: Int!
cursor: String
items: [Notification!]!
}
type Notification {
id: ID!
type: String!
read: Boolean!
payload: JSONObject!
createdAt: DateTime!
updatedAt: DateTime!
}
input NotificationUpdateInput {
id: ID!
read: Boolean!
}
extend type User {
"""
List all notifications for the user
"""
notifications(cursor: String, limit: Int): UserNotificationCollection!
@isOwner
@hasScope(scope: "notifications:read")
}
extend type Mutation {
notificationMutations: NotificationMutations! @hasServerRole(role: SERVER_GUEST)
}
type NotificationMutations {
"""
Delete an existing notification
"""
bulkDelete(ids: [String!]!): Boolean!
@hasServerRole(role: SERVER_GUEST)
@hasScope(scope: "notifications:write")
"""
update notidication
"""
bulkUpdate(input: [NotificationUpdateInput!]!): Boolean!
@hasServerRole(role: SERVER_GUEST)
@hasScope(scope: "notifications:write")
}
-2
View File
@@ -67,8 +67,6 @@ const config: CodegenConfig = {
'@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn',
AdminMutations:
'@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn',
NotificationMutations:
'@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn',
AdminQueries: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn',
ServerStatistics: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn',
ServerStats: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn',
@@ -2,7 +2,7 @@ import { db } from '@/db/knex'
import { moduleLogger } from '@/observability/logging'
import { initializeEventListenerFactory } from '@/modules/accessrequests/services/eventListener'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import { publishNotification } from '@/modules/notifications/services/publication'
import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
@@ -3,7 +3,7 @@ import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import type { GetStreamCollaborators } from '@/modules/core/domain/streams/operations'
import { Roles } from '@/modules/core/helpers/mainConstants'
import type { NotificationPublisher } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus'
type OnServerAccessRequestCreatedDeps = {
@@ -37,7 +37,7 @@ import {
removeStreamCollaboratorFactory,
validateStreamAccessFactory
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import type { BasicTestUser } from '@/test/authHelper'
@@ -62,7 +62,6 @@ import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper'
import { createTestStreams } from '@/test/speckle-helpers/streamHelper'
import { expect } from 'chai'
import { noop } from 'lodash-es'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
const getUser = getUserFactory({ db })
const getStream = getStreamFactory({ db })
@@ -237,6 +236,12 @@ describe('Project access requests', () => {
})
)
const { getSends } = emailListener.listen({ times: 1 })
const waitForAck = notificationsStateManager.waitForAck(
(e) => e.result?.type === NotificationType.NewStreamAccessRequest
)
const results = await createReq(otherGuysPrivateStream.id)
// req gets created
@@ -252,6 +257,18 @@ describe('Project access requests', () => {
expect(results.data?.projectMutations.accessRequestMutations.create.projectId).to
.be.ok
await waitForAck
// email gets sent out
const sentEmails = getSends()
expect(sentEmails.length).to.eq(1)
const emailParams = sentEmails[0]
expect(emailParams.subject).to.contain('A user requested access to your project')
expect(emailParams.html).to.be.ok
expect(emailParams.text).to.be.ok
expect(emailParams.to).to.eq(otherGuy.email)
// activity stream item inserted
const streamActivity = await getStreamActivities(otherGuysPrivateStream.id, {
actionType: StreamActionTypes.Stream.AccessRequestSent,
@@ -261,33 +278,6 @@ describe('Project access requests', () => {
expect(eventFired).to.be.true
})
!isNotificationListenerEnabled()
? it('sends an email notification ', async () => {
const { getSends } = emailListener.listen({ times: 1 })
const waitForAck = notificationsStateManager.waitForAck(
(e) => e.result?.type === NotificationType.NewStreamAccessRequest
)
const results = await createReq(otherGuysPrivateStream.id)
expect(results).to.not.haveGraphQLErrors()
await waitForAck
// email gets sent out
const sentEmails = getSends()
expect(sentEmails.length).to.eq(1)
const emailParams = sentEmails[0]
expect(emailParams.subject).to.contain(
'A user requested access to your project'
)
expect(emailParams.html).to.be.ok
expect(emailParams.text).to.be.ok
expect(emailParams.to).to.eq(otherGuy.email)
})
: {}
it('operation fails if request already exists', async () => {
const firstResults = await createReq(otherGuysPrivateStream.id)
expect(firstResults).to.not.haveGraphQLErrors()
@@ -38,7 +38,7 @@ import {
removeStreamCollaboratorFactory,
validateStreamAccessFactory
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import type { BasicTestUser } from '@/test/authHelper'
@@ -63,7 +63,6 @@ import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper'
import { createTestStreams } from '@/test/speckle-helpers/streamHelper'
import { expect } from 'chai'
import { noop } from 'lodash-es'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
const getUser = getUserFactory({ db })
const getStreamCollaborators = getStreamCollaboratorsFactory({ db })
@@ -213,6 +212,12 @@ describe('Stream access requests', () => {
})
it('operation succeeds', async () => {
const { getSends } = emailListener.listen({ times: 1 })
const waitForAck = notificationsStateManager.waitForAck(
(e) => e.result?.type === NotificationType.NewStreamAccessRequest
)
const results = await createReq(otherGuysPrivateStream.id)
// req gets created
@@ -225,6 +230,18 @@ describe('Stream access requests', () => {
)
expect(results.data?.streamAccessRequestCreate.streamId).to.be.ok
await waitForAck
// email gets sent out
const sentEmails = getSends()
expect(sentEmails.length).to.eq(1)
const emailParams = sentEmails[0]
expect(emailParams.subject).to.contain('A user requested access to your project')
expect(emailParams.html).to.be.ok
expect(emailParams.text).to.be.ok
expect(emailParams.to).to.eq(otherGuy.email)
// activity stream item inserted
const streamActivity = await getStreamActivities(otherGuysPrivateStream.id, {
actionType: StreamActionTypes.Stream.AccessRequestSent,
@@ -233,35 +250,6 @@ describe('Stream access requests', () => {
expect(streamActivity).to.have.lengthOf(1)
})
!isNotificationListenerEnabled()
? it('sends an email notification', async () => {
const { getSends } = emailListener.listen({ times: 1 })
const waitForAck = notificationsStateManager.waitForAck(
(e) => e.result?.type === NotificationType.NewStreamAccessRequest
)
const results = await createReq(otherGuysPrivateStream.id)
// req gets created
expect(results).to.not.haveGraphQLErrors()
await waitForAck
// email gets sent out
const sentEmails = getSends()
expect(sentEmails.length).to.eq(1)
const emailParams = sentEmails[0]
expect(emailParams.subject).to.contain(
'A user requested access to your project'
)
expect(emailParams.html).to.be.ok
expect(emailParams.text).to.be.ok
expect(emailParams.to).to.eq(otherGuy.email)
})
: {}
it('operation fails if request already exists', async () => {
const firstResults = await createReq(otherGuysPrivateStream.id)
expect(firstResults).to.not.haveGraphQLErrors()
@@ -1,6 +1,6 @@
import type cron from 'node-cron'
import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import { publishNotification } from '@/modules/notifications/services/publication'
import { moduleLogger } from '@/observability/logging'
import { weeklyEmailDigestEnabled } from '@/modules/shared/helpers/envHelper'
import type { EventBus } from '@/modules/shared/services/eventBus'
@@ -1,5 +1,5 @@
import type { NotificationPublisher } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import type {
CreateActivitySummary,
GetActiveUserStreams,
@@ -13,9 +13,9 @@ import {
} from '@/modules/activitystream/helpers/types'
import type {
ActivityDigestMessage,
NotificationType,
NotificationTypeMessageMap
} from '@/modules/notifications/helpers/types'
import type { NotificationType } from '@speckle/shared/notifications'
import {
geUserStreamActivityFactory,
saveStreamActivityFactory
@@ -1,6 +1,6 @@
import type { CommandModule } from 'yargs'
import { initializePublicationQueue } from '@/modules/notifications/services/publication/queue'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import { initializeQueue } from '@/modules/notifications/services/queue'
import { publishNotification } from '@/modules/notifications/services/publication'
import { cliLogger } from '@/observability/logging'
import { sendActivityNotificationsFactory } from '@/modules/activitystream/services/summary'
import { getActiveUserStreamsFactory } from '@/modules/activitystream/repositories'
@@ -18,7 +18,7 @@ const command: CommandModule = {
})
},
handler: async (argv) => {
await initializePublicationQueue()
await initializeQueue()
const numberOfDays = argv.days as number
const end = new Date()
const start = new Date(end.getTime())
@@ -6,7 +6,7 @@ import { BullAdapter } from '@bull-board/api/bullAdapter'
import {
NOTIFICATIONS_QUEUE,
buildNotificationsQueue
} from '@/modules/notifications/services/publication/queue'
} from '@/modules/notifications/services/queue'
import { noop } from 'lodash-es'
import { cliLogger } from '@/observability/logging'
@@ -1,6 +1,6 @@
import { cliLogger } from '@/observability/logging'
import { NotificationType } from '@speckle/shared/notifications'
import { initializePublicationConsumption } from '@/modules/notifications/index'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { initializeConsumption } from '@/modules/notifications/index'
import { EnvironmentResourceError } from '@/modules/shared/errors'
import { get, noop } from 'lodash-es'
import type { CommandModule } from 'yargs'
@@ -12,7 +12,7 @@ const command: CommandModule = {
cliLogger.info('Starting consumption...')
// Overriding handler for test purposes, we don't want the actual mentions logic to run
await initializePublicationConsumption({
await initializeConsumption({
[NotificationType.MentionedInComment]: async (msg, { logger, job }) => {
logger.info('Received test message with payload', msg, job)
@@ -1,8 +1,8 @@
import { cliLogger } from '@/observability/logging'
import type { MentionedInCommentData } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import { initializePublicationQueue } from '@/modules/notifications/services/publication/queue'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { publishNotification } from '@/modules/notifications/services/publication'
import { initializeQueue } from '@/modules/notifications/services/queue'
import type { CommandModule } from 'yargs'
const command: CommandModule = {
@@ -22,7 +22,7 @@ const command: CommandModule = {
})
},
handler: async (argv) => {
await initializePublicationQueue()
await initializeQueue()
// we don't want to submit a real mentions payload, this is for testing only
await publishNotification(NotificationType.MentionedInComment, {
@@ -11,7 +11,6 @@ export const commentEventsNamespace = 'comments' as const
export const CommentEvents = {
Created: `${commentEventsNamespace}.created`,
Updated: `${commentEventsNamespace}.updated`,
Viewed: `${commentEventsNamespace}.viewed`,
Archived: `${commentEventsNamespace}.archived`
} as const
@@ -26,10 +25,6 @@ export type CommentEventsPayloads = {
previousComment: CommentRecord
newComment: CommentRecord
}
[CommentEvents.Viewed]: {
userId: string
commentId: string
}
[CommentEvents.Archived]: {
userId: string
input: MutationCommentArchiveArgs
@@ -24,7 +24,7 @@ import {
insertCommentLinksFactory,
insertCommentsFactory,
markCommentUpdatedFactory,
markCommentViewedFactory as markCommentViewedFactoryDb,
markCommentViewedFactory,
resolvePaginatedProjectCommentsLatestModelResourcesFactory,
updateCommentFactory
} from '@/modules/comments/repositories/comments'
@@ -56,8 +56,7 @@ import {
createCommentThreadAndNotifyFactory,
createCommentReplyAndNotifyFactory,
editCommentAndNotifyFactory,
archiveCommentAndNotifyFactory,
markCommentViewedFactory
archiveCommentAndNotifyFactory
} from '@/modules/comments/services/management'
import {
isLegacyData,
@@ -534,10 +533,8 @@ export default {
throwIfAuthNotOk(canReadProject)
const projectDb = await getProjectDbClient({ projectId: args.input.projectId })
await markCommentViewedFactory({
markCommentViewed: markCommentViewedFactoryDb({ db: projectDb }),
emitEvent: getEventBus().emit
})(args.input.commentId, ctx.userId!)
const markCommentViewed = markCommentViewedFactory({ db: projectDb })
await markCommentViewed(args.input.commentId, ctx.userId!)
return true
},
@@ -566,7 +563,7 @@ export default {
})
const insertComments = insertCommentsFactory({ db: projectDb })
const insertCommentLinks = insertCommentLinksFactory({ db: projectDb })
const markCommentViewed = markCommentViewedFactoryDb({ db: projectDb })
const markCommentViewed = markCommentViewedFactory({ db: projectDb })
const createCommentThreadAndNotify = createCommentThreadAndNotifyFactory({
getViewerResourceItemsUngrouped,
@@ -807,7 +804,7 @@ export default {
insertComments: insertCommentsFactory({ db: projectDb }),
insertCommentLinks: insertCommentLinksFactory({ db: projectDb }),
deleteComment: deleteCommentFactory({ db: projectDb }),
markCommentViewed: markCommentViewedFactoryDb({ db: projectDb }),
markCommentViewed: markCommentViewedFactory({ db: projectDb }),
emitEvent: getEventBus().emit,
getViewerResourcesFromLegacyIdentifiers
})
@@ -869,7 +866,7 @@ export default {
throwIfAuthNotOk(canReadProject)
const projectDb = await getProjectDbClient({ projectId: args.streamId })
const markCommentViewed = markCommentViewedFactoryDb({ db: projectDb })
const markCommentViewed = markCommentViewedFactory({ db: projectDb })
await markCommentViewed(args.commentId, context.userId!)
return true
+1 -1
View File
@@ -11,7 +11,7 @@ import {
getViewerResourcesForCommentsFactory,
getViewerResourcesFromLegacyIdentifiersFactory
} from '@/modules/core/services/commit/viewerResources'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import { publishNotification } from '@/modules/notifications/services/publication'
import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
@@ -251,21 +251,3 @@ export const archiveCommentAndNotifyFactory =
return updatedComment
}
export const markCommentViewedFactory =
(deps: {
markCommentViewed: MarkCommentViewed
emitEvent: EventBusEmit
}): MarkCommentViewed =>
async (commentId: string, userId: string) => {
const updated = await deps.markCommentViewed(commentId, userId)
await deps.emitEvent({
eventName: CommentEvents.Viewed,
payload: {
commentId,
userId
}
})
return updated
}
@@ -1,8 +1,10 @@
import type { CommentRecord } from '@/modules/comments/helpers/types'
import { ensureCommentSchema } from '@/modules/comments/services/commentTextService'
import { flatten } from 'lodash-es'
import type { JSONContent } from '@tiptap/core'
import { iterateContentNodes } from '@/modules/core/services/richTextEditorService'
import { difference, flatten } from 'lodash-es'
import type { NotificationPublisher } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import type {
AddStreamCommentMentionActivity,
SaveStreamActivity
@@ -13,7 +15,30 @@ import {
StreamActionTypes,
StreamResourceTypes
} from '@/modules/activitystream/helpers/types'
import { processCommentMentions } from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment'
function findMentionedUserIds(doc: JSONContent) {
const mentionedUserIds = new Set<string>()
for (const node of iterateContentNodes(doc)) {
if (node.type === 'mention') {
const uid = node.attrs?.id
if (uid) {
mentionedUserIds.add(uid)
}
}
}
return [...mentionedUserIds]
}
function collectMentionedUserIds(comment: CommentRecord): string[] {
if (!comment.text) return []
const { doc } = ensureCommentSchema(comment.text)
if (!doc) return []
return findMentionedUserIds(doc)
}
/**
* Save "user mentioned in stream comment" activity item
@@ -80,6 +105,20 @@ const sendNotificationsForUsersFactory =
)
}
const processCommentMentionsFactory =
(deps: SendNotificationsForUsersDeps) =>
async (newComment: CommentRecord, previousComment?: CommentRecord) => {
const newMentionedUserIds = collectMentionedUserIds(newComment)
const previouslyMentionedUserIds = previousComment
? collectMentionedUserIds(previousComment)
: []
const newMentions = difference(newMentionedUserIds, previouslyMentionedUserIds)
if (!newMentions.length) return
await sendNotificationsForUsersFactory(deps)(newMentions, newComment)
}
/**
* Hook into the comments lifecycle to generate notifications accordingly
* @returns Callback to invoke when you wish to stop listening for comments events
@@ -92,22 +131,19 @@ export const notifyUsersOnCommentEventsFactory =
}) =>
async () => {
const addStreamCommentMentionActivity = addStreamCommentMentionActivityFactory(deps)
const sendNotificationsForUsers = sendNotificationsForUsersFactory({
const processCommentMentions = processCommentMentionsFactory({
...deps,
addStreamCommentMentionActivity
})
const exitCbs = [
deps.eventBus.listen(CommentEvents.Created, async ({ payload: { comment } }) => {
const newMentions = processCommentMentions(comment)
if (newMentions.length) await sendNotificationsForUsers(newMentions, comment)
await processCommentMentions(comment)
}),
deps.eventBus.listen(
CommentEvents.Updated,
async ({ payload: { newComment, previousComment } }) => {
const newMentions = processCommentMentions(newComment, previousComment)
if (newMentions.length)
await sendNotificationsForUsers(newMentions, newComment)
await processCommentMentions(newComment, previousComment)
}
)
]
@@ -32,7 +32,7 @@ import {
buildNotificationsStateTracker,
purgeNotifications
} from '@/test/notificationsHelper'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationType } from '@/modules/notifications/helpers/types'
import type { ServerAndContext } from '@/test/graphqlHelper'
import { createAuthedTestContext } from '@/test/graphqlHelper'
import {
@@ -98,7 +98,6 @@ import {
import type { GetCommentsQueryVariables } from '@/modules/core/graph/generated/graphql'
import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper'
import { createTestStream } from '@/test/speckle-helpers/streamHelper'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
const getStream = getStreamFactory({ db })
const streamResourceCheck = streamResourceCheckFactory({
@@ -1819,32 +1818,28 @@ describe('Comments @comments', () => {
...input
})
!isNotificationListenerEnabled()
? it('a valid mention triggers a notification', async () => {
const { getSends } = emailListener.listen({ times: 2 })
it('a valid mention triggers a notification', async () => {
const { getSends } = emailListener.listen({ times: 2 })
const waitForAck = notificationsState.waitForAck(
(e) => e.result?.type === NotificationType.MentionedInComment
)
const waitForAck = notificationsState.waitForAck(
(e) => e.result?.type === NotificationType.MentionedInComment
)
const { data, errors } = await createOrReplyCommentWithMention(
otherUser.id
)
const result = getResult(data)
const { data, errors } = await createOrReplyCommentWithMention(otherUser.id)
const result = getResult(data)
expect(errors).to.be.not.ok
expect(result).to.be.ok
expect(errors).to.be.not.ok
expect(result).to.be.ok
// Wait for
await waitForAck
// Wait for
await waitForAck
const emailSends = getSends()
const emailParams = emailSends[0]
expect(emailParams).to.be.ok
expect(emailParams.subject).to.contain('mentioned in a Speckle comment')
expect(emailParams.to).to.eq(otherUser.email)
})
: {}
const emailSends = getSends()
const emailParams = emailSends[0]
expect(emailParams).to.be.ok
expect(emailParams.subject).to.contain('mentioned in a Speckle comment')
expect(emailParams.to).to.eq(otherUser.email)
})
})
})
})
-12
View File
@@ -478,18 +478,6 @@ export const StreamActivity = buildTableHelper('stream_activity', [
'message'
])
export const UserNotifications = buildTableHelper('user_notifications', [
'id',
'userId',
'type',
'read',
'version',
'payload',
'sendEmailAt',
'createdAt',
'updatedAt'
])
export const UserNotificationPreferences = buildTableHelper(
'user_notification_preferences',
['userId', 'preferences']
@@ -2091,7 +2091,6 @@ export type Mutation = {
*/
inviteResend: Scalars['Boolean']['output'];
modelMutations: ModelMutations;
notificationMutations: NotificationMutations;
/** @deprecated Part of the old API surface and will be removed in the future. */
objectCreate: Array<Scalars['String']['output']>;
projectMutations: ProjectMutations;
@@ -2485,39 +2484,6 @@ export type MutationWebhookUpdateArgs = {
webhook: WebhookUpdateInput;
};
export type Notification = {
__typename?: 'Notification';
createdAt: Scalars['DateTime']['output'];
id: Scalars['ID']['output'];
payload: Scalars['JSONObject']['output'];
read: Scalars['Boolean']['output'];
type: Scalars['String']['output'];
updatedAt: Scalars['DateTime']['output'];
};
export type NotificationMutations = {
__typename?: 'NotificationMutations';
/** Delete an existing notification */
bulkDelete: Scalars['Boolean']['output'];
/** update notidication */
bulkUpdate: Scalars['Boolean']['output'];
};
export type NotificationMutationsBulkDeleteArgs = {
ids: Array<Scalars['String']['input']>;
};
export type NotificationMutationsBulkUpdateArgs = {
input: Array<NotificationUpdateInput>;
};
export type NotificationUpdateInput = {
id: Scalars['ID']['input'];
read: Scalars['Boolean']['input'];
};
export type Object = {
__typename?: 'Object';
/** @deprecated Not implemented. */
@@ -5005,8 +4971,6 @@ export type User = {
meta: UserMeta;
name: Scalars['String']['output'];
notificationPreferences: Scalars['JSONObject']['output'];
/** List all notifications for the user */
notifications: UserNotificationCollection;
permissions: RootPermissionChecks;
profiles?: Maybe<Scalars['JSONObject']['output']>;
/** Get pending project access request, that the user made */
@@ -5092,16 +5056,6 @@ export type UserFavoriteStreamsArgs = {
};
/**
* Full user type, should only be used in the context of admin operations or
* when a user is reading/writing info about himself
*/
export type UserNotificationsArgs = {
cursor?: InputMaybe<Scalars['String']['input']>;
limit?: InputMaybe<Scalars['Int']['input']>;
};
/**
* Full user type, should only be used in the context of admin operations or
* when a user is reading/writing info about himself
@@ -5292,13 +5246,6 @@ export type UserMetaMutationsSetSpeckleConBannerDismissedArgs = {
value: Scalars['Boolean']['input'];
};
export type UserNotificationCollection = {
__typename?: 'UserNotificationCollection';
cursor?: Maybe<Scalars['String']['output']>;
items: Array<Notification>;
totalCount: Scalars['Int']['output'];
};
export type UserProjectCollection = {
__typename?: 'UserProjectCollection';
cursor?: Maybe<Scalars['String']['output']>;
@@ -6619,9 +6566,6 @@ export type ResolversTypes = {
ModelsTreeItemCollection: ResolverTypeWrapper<Omit<ModelsTreeItemCollection, 'items'> & { items: Array<ResolversTypes['ModelsTreeItem']> }>;
MoveVersionsInput: MoveVersionsInput;
Mutation: ResolverTypeWrapper<{}>;
Notification: ResolverTypeWrapper<Notification>;
NotificationMutations: ResolverTypeWrapper<MutationsObjectGraphQLReturn>;
NotificationUpdateInput: NotificationUpdateInput;
Object: ResolverTypeWrapper<ObjectGraphQLReturn>;
ObjectCollection: ResolverTypeWrapper<Omit<ObjectCollection, 'objects'> & { objects: Array<ResolversTypes['Object']> }>;
ObjectCreateInput: ObjectCreateInput;
@@ -6764,7 +6708,6 @@ export type ResolversTypes = {
UserGendoAICredits: ResolverTypeWrapper<UserGendoAiCredits>;
UserMeta: ResolverTypeWrapper<UserMetaGraphQLReturn>;
UserMetaMutations: ResolverTypeWrapper<MutationsObjectGraphQLReturn>;
UserNotificationCollection: ResolverTypeWrapper<UserNotificationCollection>;
UserProjectCollection: ResolverTypeWrapper<Omit<UserProjectCollection, 'items'> & { items: Array<ResolversTypes['Project']> }>;
UserProjectsFilter: UserProjectsFilter;
UserProjectsUpdatedMessage: ResolverTypeWrapper<Omit<UserProjectsUpdatedMessage, 'project'> & { project?: Maybe<ResolversTypes['Project']> }>;
@@ -7036,9 +6979,6 @@ export type ResolversParentTypes = {
ModelsTreeItemCollection: Omit<ModelsTreeItemCollection, 'items'> & { items: Array<ResolversParentTypes['ModelsTreeItem']> };
MoveVersionsInput: MoveVersionsInput;
Mutation: {};
Notification: Notification;
NotificationMutations: MutationsObjectGraphQLReturn;
NotificationUpdateInput: NotificationUpdateInput;
Object: ObjectGraphQLReturn;
ObjectCollection: Omit<ObjectCollection, 'objects'> & { objects: Array<ResolversParentTypes['Object']> };
ObjectCreateInput: ObjectCreateInput;
@@ -7161,7 +7101,6 @@ export type ResolversParentTypes = {
UserGendoAICredits: UserGendoAiCredits;
UserMeta: UserMetaGraphQLReturn;
UserMetaMutations: MutationsObjectGraphQLReturn;
UserNotificationCollection: UserNotificationCollection;
UserProjectCollection: Omit<UserProjectCollection, 'items'> & { items: Array<ResolversParentTypes['Project']> };
UserProjectsFilter: UserProjectsFilter;
UserProjectsUpdatedMessage: Omit<UserProjectsUpdatedMessage, 'project'> & { project?: Maybe<ResolversParentTypes['Project']> };
@@ -8184,7 +8123,6 @@ export type MutationResolvers<ContextType = GraphQLContext, ParentType extends R
inviteDelete?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType, RequireFields<MutationInviteDeleteArgs, 'inviteId'>>;
inviteResend?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType, RequireFields<MutationInviteResendArgs, 'inviteId'>>;
modelMutations?: Resolver<ResolversTypes['ModelMutations'], ParentType, ContextType>;
notificationMutations?: Resolver<ResolversTypes['NotificationMutations'], ParentType, ContextType>;
objectCreate?: Resolver<Array<ResolversTypes['String']>, ParentType, ContextType, RequireFields<MutationObjectCreateArgs, 'objectInput'>>;
projectMutations?: Resolver<ResolversTypes['ProjectMutations'], ParentType, ContextType>;
requestVerification?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType>;
@@ -8221,22 +8159,6 @@ export type MutationResolvers<ContextType = GraphQLContext, ParentType extends R
workspaceMutations?: Resolver<ResolversTypes['WorkspaceMutations'], ParentType, ContextType>;
};
export type NotificationResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['Notification'] = ResolversParentTypes['Notification']> = {
createdAt?: Resolver<ResolversTypes['DateTime'], ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
payload?: Resolver<ResolversTypes['JSONObject'], ParentType, ContextType>;
read?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType>;
type?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
updatedAt?: Resolver<ResolversTypes['DateTime'], ParentType, ContextType>;
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;
};
export type NotificationMutationsResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['NotificationMutations'] = ResolversParentTypes['NotificationMutations']> = {
bulkDelete?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType, RequireFields<NotificationMutationsBulkDeleteArgs, 'ids'>>;
bulkUpdate?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType, RequireFields<NotificationMutationsBulkUpdateArgs, 'input'>>;
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;
};
export type ObjectResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['Object'] = ResolversParentTypes['Object']> = {
applicationId?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
children?: Resolver<ResolversTypes['ObjectCollection'], ParentType, ContextType, RequireFields<ObjectChildrenArgs, 'depth' | 'limit'>>;
@@ -9033,7 +8955,6 @@ export type UserResolvers<ContextType = GraphQLContext, ParentType extends Resol
meta?: Resolver<ResolversTypes['UserMeta'], ParentType, ContextType>;
name?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
notificationPreferences?: Resolver<ResolversTypes['JSONObject'], ParentType, ContextType>;
notifications?: Resolver<ResolversTypes['UserNotificationCollection'], ParentType, ContextType, Partial<UserNotificationsArgs>>;
permissions?: Resolver<ResolversTypes['RootPermissionChecks'], ParentType, ContextType>;
profiles?: Resolver<Maybe<ResolversTypes['JSONObject']>, ParentType, ContextType>;
projectAccessRequest?: Resolver<Maybe<ResolversTypes['ProjectAccessRequest']>, ParentType, ContextType, RequireFields<UserProjectAccessRequestArgs, 'projectId'>>;
@@ -9102,13 +9023,6 @@ export type UserMetaMutationsResolvers<ContextType = GraphQLContext, ParentType
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;
};
export type UserNotificationCollectionResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['UserNotificationCollection'] = ResolversParentTypes['UserNotificationCollection']> = {
cursor?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
items?: Resolver<Array<ResolversTypes['Notification']>, ParentType, ContextType>;
totalCount?: Resolver<ResolversTypes['Int'], ParentType, ContextType>;
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;
};
export type UserProjectCollectionResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['UserProjectCollection'] = ResolversParentTypes['UserProjectCollection']> = {
cursor?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
items?: Resolver<Array<ResolversTypes['Project']>, ParentType, ContextType>;
@@ -9627,8 +9541,6 @@ export type Resolvers<ContextType = GraphQLContext> = {
ModelsTreeItem?: ModelsTreeItemResolvers<ContextType>;
ModelsTreeItemCollection?: ModelsTreeItemCollectionResolvers<ContextType>;
Mutation?: MutationResolvers<ContextType>;
Notification?: NotificationResolvers<ContextType>;
NotificationMutations?: NotificationMutationsResolvers<ContextType>;
Object?: ObjectResolvers<ContextType>;
ObjectCollection?: ObjectCollectionResolvers<ContextType>;
PasswordStrengthCheckFeedback?: PasswordStrengthCheckFeedbackResolvers<ContextType>;
@@ -9709,7 +9621,6 @@ export type Resolvers<ContextType = GraphQLContext> = {
UserGendoAICredits?: UserGendoAiCreditsResolvers<ContextType>;
UserMeta?: UserMetaResolvers<ContextType>;
UserMetaMutations?: UserMetaMutationsResolvers<ContextType>;
UserNotificationCollection?: UserNotificationCollectionResolvers<ContextType>;
UserProjectCollection?: UserProjectCollectionResolvers<ContextType>;
UserProjectsUpdatedMessage?: UserProjectsUpdatedMessageResolvers<ContextType>;
UserSearchResultCollection?: UserSearchResultCollectionResolvers<ContextType>;
@@ -10726,28 +10637,6 @@ export type GetRegionalProjectBlobQueryVariables = Exact<{
export type GetRegionalProjectBlobQuery = { __typename?: 'Query', project: { __typename?: 'Project', id: string, blob?: { __typename?: 'BlobMetadata', id: string, fileName: string } | null } };
export type GetUserNotificationsQueryVariables = Exact<{
limit?: InputMaybe<Scalars['Int']['input']>;
cursor?: InputMaybe<Scalars['String']['input']>;
}>;
export type GetUserNotificationsQuery = { __typename?: 'Query', activeUser?: { __typename?: 'User', notifications: { __typename?: 'UserNotificationCollection', cursor?: string | null, totalCount: number, items: Array<{ __typename?: 'Notification', id: string, type: string, createdAt: Date, payload: Record<string, unknown>, read: boolean, updatedAt: Date }> } } | null };
export type UserBulkDeleteNotidicationMutationVariables = Exact<{
ids: Array<Scalars['String']['input']> | Scalars['String']['input'];
}>;
export type UserBulkDeleteNotidicationMutation = { __typename?: 'Mutation', notificationMutations: { __typename?: 'NotificationMutations', bulkDelete: boolean } };
export type UserBulkUpdateNotificationsMutationVariables = Exact<{
input: Array<NotificationUpdateInput> | NotificationUpdateInput;
}>;
export type UserBulkUpdateNotificationsMutation = { __typename?: 'Mutation', notificationMutations: { __typename?: 'NotificationMutations', bulkUpdate: boolean } };
export type BasicProjectAccessRequestFieldsFragment = { __typename?: 'ProjectAccessRequest', id: string, requesterId: string, projectId: string, createdAt: Date, requester: { __typename?: 'LimitedUser', id: string, name: string } };
export type CreateProjectAccessRequestMutationVariables = Exact<{
@@ -11438,9 +11327,6 @@ export const GetRegionalProjectAutomationDocument = {"kind":"Document","definiti
export const GetRegionalProjectCommentDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectComment"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"commentId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"comment"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"commentId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]}}]} as unknown as DocumentNode<GetRegionalProjectCommentQuery, GetRegionalProjectCommentQueryVariables>;
export const GetRegionalProjectWebhookDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectWebhook"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"webhookId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"webhooks"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"webhookId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"items"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]}}]}}]} as unknown as DocumentNode<GetRegionalProjectWebhookQuery, GetRegionalProjectWebhookQueryVariables>;
export const GetRegionalProjectBlobDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectBlob"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"blobId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"blob"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"blobId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"fileName"}}]}}]}}]}}]} as unknown as DocumentNode<GetRegionalProjectBlobQuery, GetRegionalProjectBlobQueryVariables>;
export const GetUserNotificationsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetUserNotifications"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"limit"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"Int"}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"cursor"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notifications"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"limit"},"value":{"kind":"Variable","name":{"kind":"Name","value":"limit"}}},{"kind":"Argument","name":{"kind":"Name","value":"cursor"},"value":{"kind":"Variable","name":{"kind":"Name","value":"cursor"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"items"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"type"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"payload"}},{"kind":"Field","name":{"kind":"Name","value":"read"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}},{"kind":"Field","name":{"kind":"Name","value":"cursor"}},{"kind":"Field","name":{"kind":"Name","value":"totalCount"}}]}}]}}]}}]} as unknown as DocumentNode<GetUserNotificationsQuery, GetUserNotificationsQueryVariables>;
export const UserBulkDeleteNotidicationDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"UserBulkDeleteNotidication"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"ids"}},"type":{"kind":"NonNullType","type":{"kind":"ListType","type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notificationMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"bulkDelete"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"ids"},"value":{"kind":"Variable","name":{"kind":"Name","value":"ids"}}}]}]}}]}}]} as unknown as DocumentNode<UserBulkDeleteNotidicationMutation, UserBulkDeleteNotidicationMutationVariables>;
export const UserBulkUpdateNotificationsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"UserBulkUpdateNotifications"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"ListType","type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"NotificationUpdateInput"}}}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notificationMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"bulkUpdate"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}]}]}}]}}]} as unknown as DocumentNode<UserBulkUpdateNotificationsMutation, UserBulkUpdateNotificationsMutationVariables>;
export const CreateProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"CreateProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"accessRequestMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"create"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}}]}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode<CreateProjectAccessRequestMutation, CreateProjectAccessRequestMutationVariables>;
export const GetActiveUserProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetActiveUserProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectAccessRequest"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode<GetActiveUserProjectAccessRequestQuery, GetActiveUserProjectAccessRequestQueryVariables>;
export const GetActiveUserFullProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetActiveUserFullProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectAccessRequest"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}},{"kind":"Field","name":{"kind":"Name","value":"project"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode<GetActiveUserFullProjectAccessRequestQuery, GetActiveUserFullProjectAccessRequestQueryVariables>;
-10
View File
@@ -33,16 +33,6 @@ export default [
description: "Read other users' profiles.",
public: true
},
{
name: Scopes.Notifications.Read,
description: 'Required to see your notifications.',
public: true
},
{
name: Scopes.Notifications.Write,
description: 'Required to update your notifications.',
public: true
},
{
name: Scopes.Server.Stats,
description:
@@ -1,12 +1,4 @@
import type {
BaseUserNotification,
NotificationChannel,
NotificationPreferences,
UserNotificationRecord
} from '@/modules/notifications/helpers/types'
import type { MaybeNullOrUndefined } from '@speckle/shared'
import type { NotificationType } from '@speckle/shared/notifications'
import type { Exact } from 'type-fest'
import type { NotificationPreferences } from '@/modules/notifications/helpers/types'
export type GetSavedUserNotificationPreferences = (
userId: string
@@ -20,43 +12,3 @@ export type SaveUserNotificationPreferences = (
export type GetUserNotificationPreferences = (
userId: string
) => Promise<NotificationPreferences>
export type GetUserPreferenceForNotificationType = (
userId: string,
notificationType: NotificationType,
notificationChannel: NotificationChannel
) => Promise<boolean>
export type GetUserNotifications = (args: {
userId: string
cursor: string | null
limit: number | null
}) => Promise<{ items: BaseUserNotification[]; cursor: string | null }>
export type GetUserNotificationsCount = (args: { userId: string }) => Promise<number>
export type GetNextEmailNotification = () => Promise<
MaybeNullOrUndefined<BaseUserNotification>
>
export type MarkCommentNotificationAsRead = (args: {
userId: string
commentId: string
}) => Promise<number>
export type DeleteUserNotifications = (args: {
userId: string
ids: string[]
}) => Promise<void>
export type StoreUserNotifications = <
Notification extends Exact<UserNotificationRecord, Notification>
>(
notifications: Notification[]
) => Promise<number>
export type UpdateUserNotification = (args: {
id: string
userId: string
update: Partial<Omit<UserNotificationRecord, 'id' | 'createdAt'>>
}) => Promise<UserNotificationRecord>
@@ -1,55 +0,0 @@
import { CommentEvents } from '@/modules/comments/domain/events'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus'
import { publishEventMessage } from '@/modules/notifications/services/events/queue'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import type { Logger } from '@/observability/logging'
import { markCommentNotificationsAsReadFactory } from '@/modules/notifications/repositories/userNotification'
import type { MarkCommentNotificationAsRead } from '@/modules/notifications/domain/operations'
import { db } from '@/db/knex'
export type NotificationEvents = EventPayload<
| typeof CommentEvents.Created
| typeof CommentEvents.Updated
| typeof AccessRequestEvents.Created
| typeof AccessRequestEvents.Finalized
>
const onEventTriggersNotificationFactory =
({ logger }: { logger: Logger }) =>
async (event: NotificationEvents) => {
if (!isNotificationListenerEnabled()) return
logger.debug('Notification triggered for event', event)
await publishEventMessage(event)
}
const onCommentViewedFactory =
(deps: { markCommentNotificationsAsRead: MarkCommentNotificationAsRead }) =>
async (event: EventPayload<typeof CommentEvents.Viewed>) => {
if (!isNotificationListenerEnabled()) return
await deps.markCommentNotificationsAsRead(event.payload)
}
export const notificationListenersFactory =
(deps: { eventBus: EventBus; logger: Logger }) => () => {
const onEventTriggersNotification = onEventTriggersNotificationFactory({
logger: deps.logger
})
const onCommentViewed = onCommentViewedFactory({
markCommentNotificationsAsRead: markCommentNotificationsAsReadFactory({ db })
})
const cbs = [
deps.eventBus.listen(CommentEvents.Created, onEventTriggersNotification),
deps.eventBus.listen(CommentEvents.Updated, onEventTriggersNotification),
deps.eventBus.listen(AccessRequestEvents.Created, onEventTriggersNotification),
deps.eventBus.listen(AccessRequestEvents.Finalized, onEventTriggersNotification),
deps.eventBus.listen(CommentEvents.Viewed, onCommentViewed)
]
return () => cbs.forEach((cb) => cb())
}
@@ -3,7 +3,7 @@ import type { Resolvers } from '@/modules/core/graph/generated/graphql'
import {
getSavedUserNotificationPreferencesFactory,
saveUserNotificationPreferencesFactory
} from '@/modules/notifications/repositories/userNotificationPreferences'
} from '@/modules/notifications/repositories'
import {
getUserNotificationPreferencesFactory,
updateNotificationPreferencesFactory
@@ -30,7 +30,7 @@ export default {
Mutation: {
async userNotificationPreferencesUpdate(_parent, args, context) {
const logger = context.log
await withOperationLogging(
await await withOperationLogging(
async () => updateNotificationPreferences(context.userId!, args.preferences),
{
logger,
@@ -1,98 +0,0 @@
import { db } from '@/db/knex'
import type { Resolvers } from '@/modules/core/graph/generated/graphql'
import { parseNotificationToLatestVersion } from '@/modules/notifications/helpers/toLatestVersion'
import {
deleteUserNotificationsFactory,
getUserNotificationsCountFactory,
getUserNotificationsFactory,
updateUserNotificationFactory
} from '@/modules/notifications/repositories/userNotification'
import { asOperation } from '@/modules/shared/command'
import { chunk } from 'lodash-es'
const getUserNotifications = getUserNotificationsFactory({ db })
const deleteUserNotifications = deleteUserNotificationsFactory({ db })
const updateUserNotification = updateUserNotificationFactory({ db })
const getUserNotificationsCount = getUserNotificationsCountFactory({ db })
const resolvers: Resolvers = {
User: {
async notifications(parent, args) {
const [totalCount, { items, cursor }] = await Promise.all([
await getUserNotificationsCount({ userId: parent.id }),
await getUserNotifications({
userId: parent.id,
cursor: args.cursor || null,
limit: args.limit || null
})
])
return {
totalCount,
cursor,
items: items.map(parseNotificationToLatestVersion)
}
}
},
Mutation: {
notificationMutations: () => ({})
},
NotificationMutations: {
async bulkDelete(_parent, { ids }, context) {
await asOperation(
async () => {
await deleteUserNotifications({
userId: context.userId!,
ids
})
},
{
logger: context.log,
name: 'userNotificationPreferencesUpdate',
description: 'deleting user notifications'
}
)
return true
},
async bulkUpdate(_parent, args, context) {
await asOperation(
async () => {
const inputBatches = chunk(args.input, 10)
for (const batch of inputBatches) {
await Promise.all(
batch.map(({ id, read }) => {
let update = {}
if (read === false) {
update = {
read: false,
sendEmailAt: null
}
} else {
update = {
read: true
}
}
return updateUserNotification({
userId: context.userId!,
id,
update
})
})
)
}
},
{
logger: context.log,
name: 'userNotificationPreferencesUpdate',
description: 'marking user notifications as read'
}
)
return true
}
}
}
export default resolvers
@@ -1,45 +0,0 @@
import type {
BaseUserNotification,
UserNotificationRecord
} from '@/modules/notifications/helpers/types'
import { LatestNotificationVersions } from '@/modules/notifications/helpers/types'
import { logger } from '@/observability/logging'
import type { Nullable } from '@speckle/shared'
export const ensureNotificationToLatestVersion = (
notification: BaseUserNotification
): Nullable<UserNotificationRecord> => {
const latestVersion = LatestNotificationVersions[notification.type]
if (notification.version === latestVersion) {
return notification as UserNotificationRecord
}
logger.error(
{
notification,
latestVersion
},
'No notification backward compatibility was configured'
)
return null
}
export const parseNotificationToLatestVersion = (
notification: BaseUserNotification
): UserNotificationRecord => {
const latestVersion = LatestNotificationVersions[notification.type]
if (notification.version === latestVersion) {
return notification as UserNotificationRecord
}
logger.warn(
{
notification,
latestVersion
},
'No notification backward compatibility was configured'
)
return notification as UserNotificationRecord
}
@@ -4,8 +4,13 @@ import type { MaybeAsync, Optional } from '@/modules/shared/helpers/typeHelper'
import type { Job } from 'bull'
import { isObject, has } from 'lodash-es'
import type { Logger } from 'pino'
import type { NotificationPayloadMap } from '@speckle/shared/notifications'
import { NotificationType } from '@speckle/shared/notifications'
export enum NotificationType {
ActivityDigest = 'activityDigest',
MentionedInComment = 'mentionedInComment',
NewStreamAccessRequest = 'newStreamAccessRequest',
StreamAccessRequestApproved = 'streamAccessRequestApproved'
}
export enum NotificationChannel {
Email = 'email'
@@ -20,37 +25,6 @@ export type UserNotificationPreferencesRecord = {
preferences: NotificationPreferences
}
export type BaseUserNotification = {
id: string
userId: string
type: NotificationType
read: boolean
version: string
payload: object
sendEmailAt: Date | null
createdAt: Date
updatedAt: Date
}
export type UserNotificationRecord = {
[K in keyof NotificationPayloadMap]: Omit<
BaseUserNotification,
'payload' | 'version' | 'type'
> & {
type: K
version: (typeof LatestNotificationVersions)[K]
payload: NotificationPayloadMap[K]
}
}[keyof NotificationPayloadMap]
const DEFAULT_VERSION = '1' as const
export const LatestNotificationVersions = {
[NotificationType.MentionedInComment]: DEFAULT_VERSION,
[NotificationType.NewStreamAccessRequest]: DEFAULT_VERSION,
[NotificationType.StreamAccessRequestApproved]: DEFAULT_VERSION,
[NotificationType.ActivityDigest]: DEFAULT_VERSION
}
// Add mappings between NotificationTypes and expected Message types here
export type NotificationTypeMessageMap = {
[NotificationType.MentionedInComment]: MentionedInCommentMessage
+13 -38
View File
@@ -1,31 +1,20 @@
import {
initializePublicationQueue,
initializeQueue,
consumeIncomingNotifications,
registerNotificationHandlers,
shutdownPublicationQueue
} from '@/modules/notifications/services/publication/queue'
shutdownQueue
} from '@/modules/notifications/services/queue'
import type { NotificationTypeHandlers } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@/modules/notifications/helpers/types'
import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { shouldDisableNotificationsConsumption } from '@/modules/shared/helpers/envHelper'
import { moduleLogger, notificationsLogger } from '@/observability/logging'
import MentionedInCommentHandler from '@/modules/notifications/services/publication/handlers/mentionedInComment'
import NewStreamAccessRequestHandler from '@/modules/notifications/services/publication/handlers/newStreamAccessRequest'
import StreamAccessRequestApprovedHandler from '@/modules/notifications/services/publication/handlers/streamAccessRequestApproved'
import ActivityDigestHandler from '@/modules/notifications/services/publication/handlers/activityDigest'
import {
initializeNotificationEventsConsumption,
initializeNotificationEventsQueue,
shutdownEventQueue
} from '@/modules/notifications/services/events/queue'
import { notificationListenersFactory } from '@/modules/notifications/events/notificationListener'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { scheduleDelayedEmailNotifications } from '@/modules/notifications/tasks/delayedNotifications'
import type cron from 'node-cron'
import { NotificationType } from '@speckle/shared/notifications'
import { moduleLogger } from '@/observability/logging'
import MentionedInCommentHandler from '@/modules/notifications/services/handlers/mentionedInComment'
import NewStreamAccessRequestHandler from '@/modules/notifications/services/handlers/newStreamAccessRequest'
import StreamAccessRequestApprovedHandler from '@/modules/notifications/services/handlers/streamAccessRequestApproved'
import ActivityDigestHandler from '@/modules/notifications/services/handlers/activityDigest'
let scheduledTasks: cron.ScheduledTask[] = []
export async function initializePublicationConsumption(
export async function initializeConsumption(
customHandlers?: Partial<NotificationTypeHandlers>
) {
moduleLogger.info('📞 Initializing notification queue consumption...')
@@ -39,7 +28,7 @@ export async function initializePublicationConsumption(
registerNotificationHandlers(customHandlers || allHandlers)
await initializePublicationQueue()
await initializeQueue()
if (shouldDisableNotificationsConsumption()) {
moduleLogger.info('Skipping notification consumption...')
@@ -51,24 +40,10 @@ export async function initializePublicationConsumption(
export const init: SpeckleModule['init'] = async ({ isInitial }) => {
moduleLogger.info('📞 Init notifications module')
if (isInitial) {
await initializePublicationConsumption()
await initializeNotificationEventsQueue()
await initializeNotificationEventsConsumption()
notificationListenersFactory({
eventBus: getEventBus(),
logger: notificationsLogger
})()
scheduledTasks = [await scheduleDelayedEmailNotifications()]
await initializeConsumption()
}
}
export const shutdown: SpeckleModule['shutdown'] = async () => {
await shutdownPublicationQueue()
await shutdownEventQueue()
scheduledTasks.forEach((task) => {
task.stop()
})
await shutdownQueue()
}
@@ -1,126 +0,0 @@
import { UserNotifications } from '@/modules/core/dbSchema'
import type {
DeleteUserNotifications,
GetNextEmailNotification,
GetUserNotifications,
GetUserNotificationsCount,
MarkCommentNotificationAsRead,
StoreUserNotifications,
UpdateUserNotification
} from '@/modules/notifications/domain/operations'
import { type UserNotificationRecord } from '@/modules/notifications/helpers/types'
import { compositeCursorTools } from '@/modules/shared/helpers/dbHelper'
import { isNullOrUndefined } from '@speckle/shared'
import { NotificationType } from '@speckle/shared/notifications'
import { type Knex } from 'knex'
import { clamp, pick } from 'lodash-es'
const tables = {
userNotifications: (db: Knex) => db<UserNotificationRecord>(UserNotifications.name)
}
const getCursorTools = () =>
compositeCursorTools({
schema: UserNotifications,
cols: ['createdAt', 'id']
})
export const getUserNotificationsFactory =
(deps: { db: Knex }): GetUserNotifications =>
async (args) => {
const limit = clamp(isNullOrUndefined(args.limit) ? 10 : args.limit, 0, 50)
const { applyCursorSortAndFilter, resolveNewCursor } = getCursorTools()
const q = tables
.userNotifications(deps.db)
.where({ userId: args.userId })
.limit(limit)
applyCursorSortAndFilter({
query: q,
cursor: args.cursor
})
const items = await q
const newCursor = resolveNewCursor(items)
return {
items,
cursor: newCursor
}
}
export const getUserNotificationsCountFactory =
(deps: { db: Knex }): GetUserNotificationsCount =>
async ({ userId }) => {
const [res] = await tables.userNotifications(deps.db).where({ userId }).count()
return parseInt(res.count.toString())
}
export const storeUserNotificationsFactory =
(deps: { db: Knex }): StoreUserNotifications =>
async (args) => {
const notifications = await deps.db(UserNotifications.name).insert(args)
return notifications.length
}
export const updateUserNotificationFactory =
(deps: { db: Knex }): UpdateUserNotification =>
async ({ userId, id, update }) => {
const [notification] = await tables
.userNotifications(deps.db)
.where({ userId, id })
.update(
pick(
update,
UserNotifications.withoutTablePrefix.cols
) as Partial<UserNotificationRecord>
)
.returning('*')
return notification
}
/**
* This function retrieves the next email notification for a user.
* If used from different transactions, it will skip locked rows, and if there is a notification it locks it for the transaction context.
* This means that two processes querying for the next email notification (if both use a transaction to send the email etc) will not pick up the same notification.
*/
export const getNextEmailNotificationFactory =
(deps: { db: Knex }): GetNextEmailNotification =>
async () => {
const notification = await tables
.userNotifications(deps.db)
.where(UserNotifications.col.sendEmailAt, '<=', new Date())
.andWhere(UserNotifications.col.read, false)
.forUpdate()
.skipLocked()
.first()
return notification
}
export const deleteUserNotificationsFactory =
(deps: { db: Knex }): DeleteUserNotifications =>
async ({ userId, ids }) => {
await tables
.userNotifications(deps.db)
.where({ userId })
.whereIn(UserNotifications.col.id, ids)
.delete()
}
export const markCommentNotificationsAsReadFactory =
(deps: { db: Knex }): MarkCommentNotificationAsRead =>
async ({ userId, commentId }) => {
const rows = await deps
.db(UserNotifications.name)
.where({ userId })
.andWhere(UserNotifications.col.type, NotificationType.MentionedInComment)
.whereJsonSupersetOf(UserNotifications.col.payload, { commentId })
.update({ read: true })
return rows
}
@@ -1,251 +0,0 @@
import { db } from '@/db/knex'
import type { GetComment } from '@/modules/comments/domain/operations'
import type { ExtendedComment } from '@/modules/comments/domain/types'
import type { CommentRecord } from '@/modules/comments/helpers/types'
import { getCommentFactory } from '@/modules/comments/repositories/comments'
import { ensureCommentSchema } from '@/modules/comments/services/commentTextService'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
import type { GetStream } from '@/modules/core/domain/streams/operations'
import type { StreamWithOptionalRole } from '@/modules/core/domain/streams/types'
import type { GetUser } from '@/modules/core/domain/users/operations'
import { Roles } from '@/modules/core/helpers/mainConstants'
import type { ServerInfo } from '@/modules/core/helpers/types'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import type { UserWithOptionalRole } from '@/modules/core/repositories/users'
import { getUserFactory } from '@/modules/core/repositories/users'
import { iterateContentNodes } from '@/modules/core/services/richTextEditorService'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import type {
GetUserPreferenceForNotificationType,
StoreUserNotifications
} from '@/modules/notifications/domain/operations'
import { NotificationValidationError } from '@/modules/notifications/errors'
import { NotificationChannel } from '@/modules/notifications/helpers/types'
import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
import type { MaybeFalsy, Nullable } from '@/modules/shared/helpers/typeHelper'
import type { EventType } from '@/modules/shared/services/eventBus'
import type { JSONContent } from '@tiptap/core'
import cryptoRandomString from 'crypto-random-string'
import type { Knex } from 'knex'
import { difference } from 'lodash-es'
import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences'
import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences'
import { NotificationType } from '@speckle/shared/notifications'
type ValidatedNotificationState = {
targetUser: UserWithOptionalRole
author: UserWithOptionalRole
stream: StreamWithOptionalRole
threadComment: ExtendedComment
mentionComment: ExtendedComment
commitOrObjectId: { commitId: Nullable<string>; objectId: Nullable<string> }
serverInfo: ServerInfo
}
function findMentionedUserIds(doc: JSONContent) {
const mentionedUserIds = new Set<string>()
for (const node of iterateContentNodes(doc)) {
if (node.type === 'mention') {
const uid = node.attrs?.id
if (uid) {
mentionedUserIds.add(uid)
}
}
}
return [...mentionedUserIds]
}
function collectMentionedUserIds(comment: CommentRecord): string[] {
if (!comment.text) return []
const { doc } = ensureCommentSchema(comment.text)
if (!doc) return []
return findMentionedUserIds(doc)
}
export const processCommentMentions = (
newComment: CommentRecord,
previousComment?: CommentRecord
) => {
const newMentionedUserIds = collectMentionedUserIds(newComment)
const previouslyMentionedUserIds = previousComment
? collectMentionedUserIds(previousComment)
: []
return difference(newMentionedUserIds, previouslyMentionedUserIds)
}
export function validateCommentNotification(state: {
targetUser: MaybeFalsy<UserWithOptionalRole>
author: MaybeFalsy<UserWithOptionalRole>
stream: MaybeFalsy<StreamWithOptionalRole>
threadComment: MaybeFalsy<ExtendedComment>
mentionComment: MaybeFalsy<ExtendedComment>
serverInfo: ServerInfo
}): ValidatedNotificationState {
const { targetUser, author, stream, threadComment, mentionComment, serverInfo } =
state
if (
!targetUser ||
targetUser.role === Roles.Server.ArchivedUser ||
!targetUser.email
) {
throw new NotificationValidationError('Invalid mention target user')
}
if (!author || author.role === Roles.Server.ArchivedUser) {
throw new NotificationValidationError('Invalid mention author user')
}
if (!stream) {
throw new NotificationValidationError('Invalid mention stream')
}
if (!threadComment || threadComment.streamId !== stream.id) {
throw new NotificationValidationError('Invalid mention thread comment')
}
if (!mentionComment || mentionComment.streamId !== stream.id) {
throw new NotificationValidationError('Invalid mention comment')
}
const commitOrObjectResource = threadComment.resources.find((r) =>
['commit', 'object'].includes(r.resourceType)
)
if (!commitOrObjectResource) {
// This will only happen if threadComment is actually a reply, so if the notification
// was emitted with wrong parameters
throw new NotificationValidationError(
"Couldn't resolve the comment's associated resource - the comment might be a reply"
)
}
const commitId =
commitOrObjectResource.resourceType === 'commit'
? commitOrObjectResource.resourceId
: null
const objectId =
commitOrObjectResource.resourceType === 'object'
? commitOrObjectResource.resourceId
: null
return {
targetUser,
author,
stream,
threadComment,
mentionComment,
commitOrObjectId: { commitId, objectId },
serverInfo
}
}
/**
* Notification that is triggered when a user is mentioned in a comment
*/
const createdOrUpdatedCommentHandlerFactory =
(deps: {
getUser: GetUser
getStream: GetStream
getCommentResolver: (deps: { projectDb: Knex }) => GetComment
getServerInfo: GetServerInfo
saveUserNotifications: StoreUserNotifications
getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType
}) =>
async ({ payload }: EventType<'comments.created' | 'comments.updated'>) => {
const mentionedUserIds =
'comment' in payload
? processCommentMentions(payload.comment)
: processCommentMentions(payload.newComment, payload.previousComment)
for (const targetUserId of mentionedUserIds) {
const {
authorId,
streamId,
id: commentId,
parentComment
} = 'comment' in payload ? payload.comment : payload.newComment
// do not notify yourself
if (authorId === targetUserId) continue
const threadId = parentComment || commentId
const isCommentAndThreadTheSame = threadId === commentId
const projectDb = await getProjectDbClient({ projectId: streamId })
const getComment = deps.getCommentResolver({ projectDb })
const [targetUser, author, stream, threadComment, comment, serverInfo] =
await Promise.all([
deps.getUser(targetUserId),
deps.getUser(authorId),
deps.getStream({ streamId }),
getComment({ id: threadId }),
isCommentAndThreadTheSame ? null : getComment({ id: commentId }),
deps.getServerInfo()
])
const mentionComment = isCommentAndThreadTheSame ? threadComment : comment
// Validate message
const state = validateCommentNotification({
targetUser,
author,
stream,
threadComment,
mentionComment,
serverInfo
})
const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType(
state.targetUser.id,
NotificationType.MentionedInComment,
NotificationChannel.Email
)
const now = new Date()
await deps.saveUserNotifications([
{
id: cryptoRandomString({ length: 10 }),
userId: state.targetUser.id,
type: NotificationType.MentionedInComment,
read: false,
version: '1',
payload: {
threadId: state.threadComment.id,
authorId: state.author.id,
commentId: state.mentionComment.id,
streamId: state.stream.id
},
sendEmailAt: isSubscribedToEmail ? now : null,
createdAt: now,
updatedAt: now
}
])
}
}
export const handler = async (
event: EventType<'comments.created' | 'comments.updated'>
) => {
const createdOrUpdatedCommentHandler = createdOrUpdatedCommentHandlerFactory({
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db }),
getCommentResolver: ({ projectDb }) => getCommentFactory({ db: projectDb }),
getServerInfo: getServerInfoFactory({ db }),
saveUserNotifications: storeUserNotificationsFactory({ db }),
getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({
getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({
db
})
})
})
return createdOrUpdatedCommentHandler(event)
}
export default handler
@@ -1,157 +0,0 @@
import {
AccessRequestType,
getPendingAccessRequestFactory
} from '@/modules/accessrequests/repositories'
import { NotificationValidationError } from '@/modules/notifications/errors'
import { Roles } from '@/modules/core/helpers/mainConstants'
import { sendEmail } from '@/modules/emails/services/sending'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { db } from '@/db/knex'
import type { GetPendingAccessRequest } from '@/modules/accessrequests/domain/operations'
import type {
GetStream,
GetStreamCollaborators
} from '@/modules/core/domain/streams/operations'
import {
getStreamCollaboratorsFactory,
getStreamFactory
} from '@/modules/core/repositories/streams'
import type { GetUser } from '@/modules/core/domain/users/operations'
import { getUserFactory } from '@/modules/core/repositories/users'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import type { EventBusPayloads, EventType } from '@/modules/shared/services/eventBus'
import type {
GetUserPreferenceForNotificationType,
StoreUserNotifications
} from '@/modules/notifications/domain/operations'
import type { UserNotificationRecord } from '@/modules/notifications/helpers/types'
import { NotificationChannel } from '@/modules/notifications/helpers/types'
import cryptoRandomString from 'crypto-random-string'
import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences'
import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences'
import { NotificationType } from '@speckle/shared/notifications'
type ValidateMessageDeps = {
getPendingAccessRequest: GetPendingAccessRequest
getUser: GetUser
getStream: GetStream
getStreamCollaborators: GetStreamCollaborators
}
const validateMessageFactory =
(deps: ValidateMessageDeps) =>
async ({ payload }: { payload: EventBusPayloads['accessrequests.created'] }) => {
const {
request: { id: requestId, resourceId: streamId }
} = payload
if (!streamId) throw new NotificationValidationError('No stream ID provided')
const stream = await deps.getStream({ streamId })
if (!stream) throw new NotificationValidationError('Nonexistant stream')
const request = await deps.getPendingAccessRequest(
requestId,
AccessRequestType.Stream
)
if (!request)
throw new NotificationValidationError('Nonexistant stream access request')
const owners = await deps.getStreamCollaborators(streamId, Roles.Stream.Owner)
if (!owners.length) throw new NotificationValidationError('Stream has no owners')
const requester = await deps.getUser(request.requesterId)
if (!requester)
throw new NotificationValidationError(
'User who made the request no longer exists'
)
const targetUsers = []
for (const owner of owners) {
const [user, streamWithRole] = await Promise.all([
deps.getUser(owner.id),
deps.getStream({
streamId: request.resourceId,
userId: owner.id
})
])
if (!user) throw new NotificationValidationError('User no longer exists')
if (!streamWithRole) throw new NotificationValidationError('Nonexistant stream')
if (streamWithRole.role !== Roles.Stream.Owner)
throw new NotificationValidationError(
'Only stream owners can receive notifications about stream access requests'
)
targetUsers.push(user)
}
return {
request,
stream,
targetUsers,
requester
}
}
const streamAccessRequestCreatedHandlerFactory =
(
deps: {
getServerInfo: GetServerInfo
renderEmail: typeof renderEmail
sendEmail: typeof sendEmail
saveUserNotifications: StoreUserNotifications
getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType
} & ValidateMessageDeps
) =>
async (event: EventType<'accessrequests.created'>) => {
const state = await validateMessageFactory(deps)(event)
const now = new Date()
const notifications: UserNotificationRecord[] = []
for (const targetUser of state.targetUsers) {
const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType(
targetUser.id,
NotificationType.NewStreamAccessRequest,
NotificationChannel.Email
)
notifications.push({
id: cryptoRandomString({ length: 10 }),
userId: targetUser.id,
type: NotificationType.NewStreamAccessRequest,
read: false,
version: '1',
payload: {
streamId: state.stream.id,
requesterId: state.requester.id
},
sendEmailAt: isSubscribedToEmail ? now : null,
createdAt: now,
updatedAt: now
})
}
await deps.saveUserNotifications(notifications)
}
export const handler = (event: EventType<'accessrequests.created'>) => {
const streamAccessRequestCreatedHandler = streamAccessRequestCreatedHandlerFactory({
getServerInfo: getServerInfoFactory({ db }),
renderEmail,
sendEmail,
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db }),
getPendingAccessRequest: getPendingAccessRequestFactory({ db }),
getStreamCollaborators: getStreamCollaboratorsFactory({ db }),
saveUserNotifications: storeUserNotificationsFactory({ db }),
getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({
getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({
db
})
})
})
return streamAccessRequestCreatedHandler(event)
}
export default handler
@@ -1,114 +0,0 @@
import { db } from '@/db/knex'
import type { GetStream } from '@/modules/core/domain/streams/operations'
import type { GetUser } from '@/modules/core/domain/users/operations'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import type {
GetUserPreferenceForNotificationType,
StoreUserNotifications
} from '@/modules/notifications/domain/operations'
import { NotificationValidationError } from '@/modules/notifications/errors'
import { NotificationChannel } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
import type { EventBusPayloads, EventType } from '@/modules/shared/services/eventBus'
import type { Nullable } from '@speckle/shared'
import cryptoRandomString from 'crypto-random-string'
import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences'
import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences'
type ValidateMessageDeps = {
getUser: GetUser
getStream: GetStream
}
const validateEventFactory =
(deps: ValidateMessageDeps) =>
async ({
targetUserId,
resourceId,
finalizedBy
}: {
targetUserId: string
resourceId: Nullable<string>
finalizedBy: string
}) => {
if (!resourceId) throw new NotificationValidationError('No stream provided')
const [targetUser, finalizer, stream] = await Promise.all([
deps.getUser(targetUserId),
deps.getUser(finalizedBy),
deps.getStream({ streamId: resourceId, userId: targetUserId })
])
if (!targetUser)
throw new NotificationValidationError('Invalid notification target user')
if (!finalizer)
throw new NotificationValidationError('Invalid notification finalizer')
if (!stream) throw new NotificationValidationError('Invalid stream')
if (!stream.role)
throw new NotificationValidationError(
'User doesnt appear to have a role on the stream'
)
return { targetUser, finalizer, stream }
}
const steamAccessRequestFinalizedHandlerFactory =
(
deps: {
saveUserNotifications: StoreUserNotifications
getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType
} & ValidateMessageDeps
) =>
async (args: {
payload: EventBusPayloads['accessrequests.finalized'] // TODO: smarter typing
}) => {
const { approved, request, finalizedBy } = args.payload
// notify only approvals
if (!approved) return
const state = await validateEventFactory(deps)({
targetUserId: request.requesterId,
resourceId: request.resourceId,
finalizedBy
})
const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType(
state.targetUser.id,
NotificationType.StreamAccessRequestApproved,
NotificationChannel.Email
)
const now = new Date()
await deps.saveUserNotifications([
{
id: cryptoRandomString({ length: 10 }),
userId: state.targetUser.id,
type: NotificationType.StreamAccessRequestApproved,
read: false,
version: '1',
payload: {
streamId: state.stream.id
},
sendEmailAt: isSubscribedToEmail ? now : null,
createdAt: now,
updatedAt: now
}
])
}
export const handler = async (event: EventType<'accessrequests.finalized'>) => {
const steamAccessRequestFinalizedHandler = steamAccessRequestFinalizedHandlerFactory({
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db }),
saveUserNotifications: storeUserNotificationsFactory({ db }),
getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({
getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({
db
})
})
})
return steamAccessRequestFinalizedHandler(event)
}
export default handler
@@ -1,102 +0,0 @@
import { UninitializedResourceAccessError } from '@/modules/shared/errors'
import type { Optional } from '@/modules/shared/helpers/typeHelper'
import { getRedisUrl, isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import type Bull from 'bull'
import { initializeQueue as setupQueue } from '@speckle/shared/queue'
import { TIME_MS } from '@speckle/shared'
import type { NotificationEvents } from '@/modules/notifications/events/notificationListener'
import { notificationsLogger, Observability } from '@/observability/logging'
import { UnhandledNotificationError } from '@/modules/notifications/errors'
import CreatedOrUpdatedCommentHandler from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment'
import StreamAccessRequestCreatedHandler from '@/modules/notifications/services/events/handlers/streamAccessRequestCreated'
import StreamAccessRequestFinalizedHandler from '@/modules/notifications/services/events/handlers/streamAccessRequestFinalized'
import { CommentEvents } from '@/modules/comments/domain/events'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
export const NOTIFICATION_EVENTS_QUEUE = 'default:user-event-notifications'
let queue: Optional<Bull.Queue>
export const buildNotificationEventsQueue = async (queueName: string) =>
await setupQueue({
queueName,
redisUrl: getRedisUrl(),
options: {
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: TIME_MS.second
}
}
: {}),
defaultJobOptions: {
attempts: 1,
timeout: 10 * TIME_MS.second,
removeOnComplete: isProdEnv(),
removeOnFail: isProdEnv()
}
}
})
export function getQueue(): Bull.Queue {
if (!queue) {
throw new UninitializedResourceAccessError(
'Attempting to use uninitialized Bull queue'
)
}
return queue
}
export async function initializeNotificationEventsQueue() {
queue = await buildNotificationEventsQueue(NOTIFICATION_EVENTS_QUEUE)
}
export async function initializeNotificationEventsConsumption() {
const queue = getQueue()
void queue.process(async ({ data: event }: Bull.Job<NotificationEvents>) => {
const notificationLogger = Observability.extendLoggerComponent(
notificationsLogger,
event.eventName
)
try {
notificationLogger.info('Handling notifications for event')
switch (event.eventName) {
case CommentEvents.Created:
case CommentEvents.Updated:
await CreatedOrUpdatedCommentHandler(event)
break
case AccessRequestEvents.Created:
await StreamAccessRequestCreatedHandler(event)
break
case AccessRequestEvents.Finalized:
await StreamAccessRequestFinalizedHandler(event)
break
default:
throw new UnhandledNotificationError(null, { info: event })
}
notificationLogger.info('Handled notifications for event')
} catch (e: unknown) {
notificationsLogger.error(e)
}
})
}
/**
* Publish message onto the notifications queue
*/
export async function publishEventMessage(message: NotificationEvents) {
const queue = getQueue()
const job = await queue.add(message)
return job.id
}
export async function shutdownEventQueue() {
if (!queue) return
await queue.close()
}
@@ -17,7 +17,7 @@ import path from 'path'
import * as ejs from 'ejs'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { getUserNotificationPreferencesFactory } from '@/modules/notifications/services/notificationPreferences'
import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences'
import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories'
import { db } from '@/db/knex'
import type { GetUserNotificationPreferences } from '@/modules/notifications/domain/operations'
import type { CreateActivitySummary } from '@/modules/activitystream/domain/operations'
@@ -116,7 +116,7 @@ function buildEmailTemplateMjml(
Hello,<br/>
<br/>
<b>${author.name}</b> has just mentioned you in a comment on the <b>${stream.name}</b> project.
Please click on the button below to see the comment.
Please click on the button below to see the comment.
</mj-text>
`,
bodyEnd: `<br/><br/>`
@@ -130,7 +130,7 @@ function buildEmailTemplateText(
return {
bodyStart: `Hello
${author.name} has just mentioned you in a comment on the ${stream.name} project.
Please open the link below to see the comment.`,
bodyEnd: undefined
@@ -1,13 +1,14 @@
import type { NotificationPreferences } from '@/modules/notifications/helpers/types'
import { NotificationChannel } from '@/modules/notifications/helpers/types'
import {
NotificationChannel,
NotificationType
} from '@/modules/notifications/helpers/types'
import { InvalidArgumentError } from '@/modules/shared/errors'
import type {
GetSavedUserNotificationPreferences,
GetUserNotificationPreferences,
GetUserPreferenceForNotificationType,
SaveUserNotificationPreferences
} from '@/modules/notifications/domain/operations'
import { NotificationType } from '@speckle/shared/notifications'
export const getUserNotificationPreferencesFactory =
(deps: {
@@ -34,18 +35,6 @@ function addDefaultPreferenceValues(
return savedPreferences
}
export const getUserPreferenceForNotificationTypeFactory =
(deps: {
getSavedUserNotificationPreferences: GetSavedUserNotificationPreferences
}): GetUserPreferenceForNotificationType =>
async (userId, notificationType, notificationChannel) => {
const preferences = await deps.getSavedUserNotificationPreferences(userId)
if (!preferences) return true
const notificationTypeSettings = preferences[notificationType]
return notificationTypeSettings?.[notificationChannel] ?? false
}
export const updateNotificationPreferencesFactory =
(deps: { saveUserNotificationPreferences: SaveUserNotificationPreferences }) =>
async (userId: string, rawPreferences: Record<string, unknown>): Promise<void> => {
@@ -2,12 +2,10 @@ import type {
NotificationPublisher,
NotificationTypeMessageMap
} from '@/modules/notifications/helpers/types'
import { publishMessage } from '@/modules/notifications/services/publication/queue'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
import { publishMessage } from '@/modules/notifications/services/queue'
/**
* Publish a notification
* @deprecated new implementations should be built using the notificationListener handlers
*/
export const publishNotification: NotificationPublisher = async (type, params) => {
const msg = {
@@ -15,9 +13,5 @@ export const publishNotification: NotificationPublisher = async (type, params) =
...params
} as NotificationTypeMessageMap[typeof type]
// return is only consumed by specs
// this satisfies ts
if (isNotificationListenerEnabled()) return -1
return await publishMessage(msg)
}
@@ -8,6 +8,7 @@ import {
import type {
NotificationHandler,
NotificationMessage,
NotificationType,
NotificationTypeHandlers
} from '@/modules/notifications/helpers/types'
import { isNotificationMessage } from '@/modules/notifications/helpers/types'
@@ -20,7 +21,6 @@ import { ensureErrorOrWrapAsCause } from '@/modules/shared/errors/ensureError'
import { TIME_MS } from '@speckle/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { NotificationsEvents } from '@/modules/notifications/domain/events'
import type { NotificationType } from '@speckle/shared/notifications'
export type NotificationJobResult = {
status: NotificationJobResultsStatus
@@ -87,7 +87,7 @@ export function getQueue(): Bull.Queue {
/**
* Initialize notifications queue
*/
export async function initializePublicationQueue() {
export async function initializeQueue() {
queue = await buildNotificationsQueue(NOTIFICATIONS_QUEUE)
}
@@ -186,7 +186,7 @@ export async function consumeIncomingNotifications() {
})
}
export async function shutdownPublicationQueue() {
export async function shutdownQueue() {
if (!queue) return
await queue.close()
}
@@ -1,105 +0,0 @@
import { db } from '@/db/knex'
import {
acquireTaskLockFactory,
releaseTaskLockFactory
} from '@/modules/core/repositories/scheduledTasks'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import type { Logger } from '@/observability/logging'
import {
getNextEmailNotificationFactory,
updateUserNotificationFactory
} from '@/modules/notifications/repositories/userNotification'
import { NotificationType } from '@speckle/shared/notifications'
import MentionedInCommentHandler from '@/modules/notifications/tasks/handlers/mentionedInComment'
import StreamAccessRequestApprovedHandler from '@/modules/notifications/tasks/handlers/streamAccessRequestApproved'
import NewStreamAccessRequestHandler from '@/modules/notifications/tasks/handlers/newStreamAccessRequest'
import { ensureNotificationToLatestVersion } from '@/modules/notifications/helpers/toLatestVersion'
type EmailNotificationResult = { notificationId: string } | null
const handleNextEmailNotification = async (deps: {
logger: Logger
}): Promise<EmailNotificationResult> =>
db.transaction(async (trx) => {
const baseNotification = await getNextEmailNotificationFactory({ db: trx })()
if (!baseNotification) return null
const notification = ensureNotificationToLatestVersion(baseNotification)
if (!notification) return null
try {
switch (notification.type) {
case NotificationType.MentionedInComment:
await MentionedInCommentHandler(notification)
break
case NotificationType.StreamAccessRequestApproved:
await StreamAccessRequestApprovedHandler(notification)
break
case NotificationType.NewStreamAccessRequest:
await NewStreamAccessRequestHandler(notification)
break
default:
deps.logger.error(
{
type: notification.type,
notificationId: notification.id
},
`No handler scheduled notification type. Skipping.`
)
break
}
} catch (error) {
deps.logger.error(
{
error,
type: notification.type,
notificationId: notification.id
},
`Error handling notification. Skipping.`
)
}
await updateUserNotificationFactory({ db: trx })({
id: notification.id,
userId: notification.userId,
update: {
sendEmailAt: null,
updatedAt: new Date()
}
})
return { notificationId: notification.id }
})
export const emitDelayedEmailNotifications = async (deps: { logger: Logger }) => {
let result: EmailNotificationResult
const MAX_ITERATIONS = 10_000
let iterationCount = 0
do {
if (iterationCount++ >= MAX_ITERATIONS) {
deps.logger.error(`Reached max iteration limit of ${MAX_ITERATIONS}.`)
break
}
result = await handleNextEmailNotification(deps)
} while (result)
}
export const scheduleDelayedEmailNotifications = async () => {
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db }),
releaseTaskLock: releaseTaskLockFactory({ db })
})
const everyMin = '*/1 * * * *'
return scheduleExecution(
everyMin,
'DelayedEmailNotifications',
async (_scheduledTime, { logger }) => {
await emitDelayedEmailNotifications({
logger
})
}
)
}
@@ -1,150 +0,0 @@
import { db } from '@/db/knex'
import type { GetComment } from '@/modules/comments/domain/operations'
import { getCommentFactory } from '@/modules/comments/repositories/comments'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
import type { GetStream } from '@/modules/core/domain/streams/operations'
import type { GetUser } from '@/modules/core/domain/users/operations'
import { getCommentRoute } from '@/modules/core/helpers/routeHelper'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import type { EmailTemplateParams } from '@/modules/emails/domain/operations'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { sendEmail } from '@/modules/emails/services/sending'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import type { UserNotificationRecord } from '@/modules/notifications/helpers/types'
import type { NotificationType } from '@speckle/shared/notifications'
import { getFrontendOrigin } from '@/modules/shared/helpers/envHelper'
import type { Knex } from 'knex'
import { validateCommentNotification } from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment'
type MentionedInCommentNotification = Extract<
UserNotificationRecord,
{ type: NotificationType.MentionedInComment }
>
function buildEmailTemplateMjml(
state: ReturnType<typeof validateCommentNotification>
): EmailTemplateParams['mjml'] {
const { author, stream } = state
return {
bodyStart: `
<mj-text align="center" line-height="2" >
Hello,<br/>
<br/>
<b>${author.name}</b> has just mentioned you in a comment on the <b>${stream.name}</b> project.
Please click on the button below to see the comment.
</mj-text>
`,
bodyEnd: `<br/><br/>`
}
}
function buildEmailTemplateText(
state: ReturnType<typeof validateCommentNotification>
): EmailTemplateParams['text'] {
const { author, stream } = state
return {
bodyStart: `Hello
${author.name} has just mentioned you in a comment on the ${stream.name} project.
Please open the link below to see the comment.`,
bodyEnd: undefined
}
}
function buildEmailTemplateParams(
state: ReturnType<typeof validateCommentNotification>
): EmailTemplateParams {
const {
commitOrObjectId: { objectId, commitId },
stream,
threadComment
} = state
const commentRoute = getCommentRoute(stream.id, threadComment.id, {
objectId,
commitId
})
const url = new URL(commentRoute, getFrontendOrigin()).toString()
return {
mjml: buildEmailTemplateMjml(state),
text: buildEmailTemplateText(state),
cta: {
url,
title: 'View comment thread'
}
}
}
/**
* Notification that is triggered when a user is mentioned in a comment
*/
const mentionedInCommentEmailHandlerFactory =
(deps: {
getUser: GetUser
getStream: GetStream
getCommentResolver: (deps: { projectDb: Knex }) => GetComment
getServerInfo: GetServerInfo
renderEmail: typeof renderEmail
sendEmail: typeof sendEmail
}) =>
async (notification: MentionedInCommentNotification) => {
const { threadId, commentId, authorId, streamId } = notification.payload
const isCommentAndThreadTheSame = threadId === commentId
const projectDb = await getProjectDbClient({ projectId: streamId })
const getComment = deps.getCommentResolver({ projectDb })
const [targetUser, author, stream, threadComment, comment, serverInfo] =
await Promise.all([
deps.getUser(notification.userId),
deps.getUser(authorId),
deps.getStream({ streamId }),
getComment({ id: threadId }),
isCommentAndThreadTheSame ? null : getComment({ id: commentId }),
deps.getServerInfo()
])
const mentionComment = isCommentAndThreadTheSame ? threadComment : comment
// Validate message
const state = validateCommentNotification({
targetUser,
author,
stream,
threadComment,
mentionComment,
serverInfo
})
const templateParams = buildEmailTemplateParams(state)
const { text, html } = await deps.renderEmail(
templateParams,
serverInfo,
targetUser
)
await deps.sendEmail({
to: state.targetUser.email,
text,
html,
subject: "You've just been mentioned in a Speckle comment"
})
}
export const handler = async (notification: MentionedInCommentNotification) => {
const mentionedInCommentHandler = mentionedInCommentEmailHandlerFactory({
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db }),
getCommentResolver: ({ projectDb }) => getCommentFactory({ db: projectDb }),
getServerInfo: getServerInfoFactory({ db }),
renderEmail,
sendEmail
})
return mentionedInCommentHandler(notification)
}
export default handler
@@ -1,170 +0,0 @@
import { getPendingAccessRequestFactory } from '@/modules/accessrequests/repositories'
import { NotificationValidationError } from '@/modules/notifications/errors'
import { Roles } from '@/modules/core/helpers/mainConstants'
import {
buildAbsoluteFrontendUrlFromPath,
getStreamCollaboratorsRoute
} from '@/modules/core/helpers/routeHelper'
import { sendEmail } from '@/modules/emails/services/sending'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { db } from '@/db/knex'
import type { GetPendingAccessRequest } from '@/modules/accessrequests/domain/operations'
import type {
GetStream,
GetStreamCollaborators
} from '@/modules/core/domain/streams/operations'
import {
getStreamCollaboratorsFactory,
getStreamFactory
} from '@/modules/core/repositories/streams'
import type { GetUser } from '@/modules/core/domain/users/operations'
import { getUserFactory } from '@/modules/core/repositories/users'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import type { EmailTemplateParams } from '@/modules/emails/domain/operations'
import type { StoreUserNotifications } from '@/modules/notifications/domain/operations'
import type { UserNotificationRecord } from '@/modules/notifications/helpers/types'
import type { NotificationType } from '@speckle/shared/notifications'
import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
type ValidateMessageDeps = {
getPendingAccessRequest: GetPendingAccessRequest
getUser: GetUser
getStream: GetStream
getStreamCollaborators: GetStreamCollaborators
}
type NewSreamAccessRequestNotification = Extract<
UserNotificationRecord,
{ type: NotificationType.NewStreamAccessRequest }
>
const validateMessageFactory =
(deps: ValidateMessageDeps) =>
async (notification: NewSreamAccessRequestNotification) => {
const { streamId, requesterId } = notification.payload
const userId = notification.userId
if (!streamId) throw new NotificationValidationError('No stream ID provided')
const stream = await deps.getStream({ streamId })
if (!stream) throw new NotificationValidationError('Nonexistant stream')
const requester = await deps.getUser(requesterId)
if (!requester)
throw new NotificationValidationError(
'User who made the request no longer exists'
)
const [targetUser, streamWithRole] = await Promise.all([
deps.getUser(userId),
deps.getStream({
streamId,
userId
})
])
if (!targetUser) throw new NotificationValidationError('User no longer exists')
if (!streamWithRole) throw new NotificationValidationError('Nonexistant stream')
if (streamWithRole.role !== Roles.Stream.Owner)
throw new NotificationValidationError(
'Only stream owners can receive notifications about stream access requests'
)
return {
stream,
targetUser,
requester
}
}
type ValidatedMessageState = Awaited<
ReturnType<ReturnType<typeof validateMessageFactory>>
>
function buildEmailTemplateHtml(
state: ValidatedMessageState
): EmailTemplateParams['mjml'] {
const { requester, stream } = state
return {
bodyStart: `<mj-text align="center" line-height="2" >
Hello,<br/>
<br/>
<b>${requester.name}</b> requested access to the <b>${stream.name}</b> project.
You can add them as a collaborator by clicking the button below.
</mj-text>
`,
bodyEnd: `<mj-text align="center" padding-bottom="0px" line-height="2">
You received this email because you are an owner on <b>${stream.name}</b>.
</mj-text>`
}
}
function buildEmailTemplateText(
state: ValidatedMessageState
): EmailTemplateParams['text'] {
const { requester, stream } = state
return {
bodyStart: `Hello,\n\n${requester.name} requested access to the ${stream.name} project. You can add them as a collaborator by opening the link below.`,
bodyEnd: `You received this email because you are an owner on ${stream.name}`
}
}
function buildEmailTemplateParams(state: ValidatedMessageState): EmailTemplateParams {
const { stream } = state
return {
mjml: buildEmailTemplateHtml(state),
text: buildEmailTemplateText(state),
cta: {
title: 'Review Request',
url: buildAbsoluteFrontendUrlFromPath(getStreamCollaboratorsRoute(stream.id))
}
}
}
const newSreamAccessRequestHandlerFactory =
(
deps: {
getServerInfo: GetServerInfo
renderEmail: typeof renderEmail
sendEmail: typeof sendEmail
saveUserNotifications: StoreUserNotifications
} & ValidateMessageDeps
) =>
async (notification: NewSreamAccessRequestNotification) => {
const state = await validateMessageFactory(deps)(notification)
const htmlTemplateParams = buildEmailTemplateParams(state)
const serverInfo = await deps.getServerInfo()
const { html, text } = await deps.renderEmail(
htmlTemplateParams,
serverInfo,
state.targetUser
)
await deps.sendEmail({
to: state.targetUser.email,
text,
html,
subject: 'A user requested access to your project'
})
}
export const handler = (notification: NewSreamAccessRequestNotification) => {
const streamAccessRequestCreatedHandler = newSreamAccessRequestHandlerFactory({
getServerInfo: getServerInfoFactory({ db }),
renderEmail,
sendEmail,
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db }),
getPendingAccessRequest: getPendingAccessRequestFactory({ db }),
getStreamCollaborators: getStreamCollaboratorsFactory({ db }),
saveUserNotifications: storeUserNotificationsFactory({ db })
})
return streamAccessRequestCreatedHandler(notification)
}
export default handler
@@ -1,134 +0,0 @@
import { db } from '@/db/knex'
import type { GetServerInfo } from '@/modules/core/domain/server/operations'
import type { GetStream } from '@/modules/core/domain/streams/operations'
import type { GetUser } from '@/modules/core/domain/users/operations'
import {
buildAbsoluteFrontendUrlFromPath,
getStreamRoute
} from '@/modules/core/helpers/routeHelper'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import type { EmailTemplateParams } from '@/modules/emails/domain/operations'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { sendEmail } from '@/modules/emails/services/sending'
import { NotificationValidationError } from '@/modules/notifications/errors'
import type { UserNotificationRecord } from '@/modules/notifications/helpers/types'
import type { NotificationType } from '@speckle/shared/notifications'
type ValidateMessageDeps = {
getUser: GetUser
getStream: GetStream
}
type StreamAccessApprovedNotification = Extract<
UserNotificationRecord,
{ type: NotificationType.StreamAccessRequestApproved }
>
const validateNotificationFactory =
(deps: ValidateMessageDeps) =>
async (notification: StreamAccessApprovedNotification) => {
const streamId = notification.payload.streamId
if (!streamId) throw new NotificationValidationError('No stream provided')
const [targetUser, stream] = await Promise.all([
deps.getUser(notification.userId),
deps.getStream({ streamId, userId: notification.userId })
])
if (!targetUser)
throw new NotificationValidationError('Invalid notification target user')
if (!stream) throw new NotificationValidationError('Invalid stream')
if (!stream.role)
throw new NotificationValidationError(
'User doesnt appear to have a role on the stream'
)
return { targetUser, stream }
}
type ValidatedMessageState = Awaited<
ReturnType<ReturnType<typeof validateNotificationFactory>>
>
function buildEmailTemplateMjml(
state: ValidatedMessageState
): EmailTemplateParams['mjml'] {
const { stream } = state
return {
bodyStart: `<mj-text align="center" line-height="2" >
Hello,<br/>
<br/>
You have just been granted access to the <b>${stream.name}</b> project. Check it out below:
</mj-text>
`,
bodyEnd: `<mj-text align="center" line-height="2" >
You received this email because you requested access to this project
</mj-text>`
}
}
function buildEmailTemplateText(
state: ValidatedMessageState
): EmailTemplateParams['text'] {
const { stream } = state
return {
bodyStart: `Hello,\n\nYou have just been granted access to the ${stream.name} stream. Check it below:`,
bodyEnd: `You received this email because you requested access to this stream`
}
}
function buildEmailTemplateParams(state: ValidatedMessageState): EmailTemplateParams {
const { stream } = state
return {
mjml: buildEmailTemplateMjml(state),
text: buildEmailTemplateText(state),
cta: {
title: 'View Stream',
url: buildAbsoluteFrontendUrlFromPath(getStreamRoute(stream.id))
}
}
}
const steamAccessRequestFinalizedHandlerFactory =
(
deps: {
getServerInfo: GetServerInfo
renderEmail: typeof renderEmail
sendEmail: typeof sendEmail
} & ValidateMessageDeps
) =>
async (notification: StreamAccessApprovedNotification) => {
const state = await validateNotificationFactory(deps)(notification)
const htmlTemplateParams = buildEmailTemplateParams(state)
const serverInfo = await deps.getServerInfo()
const { html, text } = await deps.renderEmail(
htmlTemplateParams,
serverInfo,
state.targetUser
)
await deps.sendEmail({
to: state.targetUser.email,
text,
html,
subject: 'Your project access request has been approved'
})
}
export const handler = async (notification: StreamAccessApprovedNotification) => {
const steamAccessRequestFinalizedHandler = steamAccessRequestFinalizedHandlerFactory({
getServerInfo: getServerInfoFactory({ db }),
renderEmail,
sendEmail,
getUser: getUserFactory({ db }),
getStream: getStreamFactory({ db })
})
return steamAccessRequestFinalizedHandler(notification)
}
export default handler
@@ -16,7 +16,7 @@ import { renderEmail } from '@/modules/emails/services/emailRendering'
import type {
DigestTopic,
Digest
} from '@/modules/notifications/services/publication/handlers/activityDigest'
} from '@/modules/notifications/services/handlers/activityDigest'
import {
digestMostActiveStream,
mostActiveComment,
@@ -26,7 +26,7 @@ import {
digestActiveStreams,
closingOverview,
prepareSummaryEmailFactory
} from '@/modules/notifications/services/publication/handlers/activityDigest'
} from '@/modules/notifications/services/handlers/activityDigest'
import { expect } from 'chai'
import { range } from 'lodash-es'
@@ -1,35 +0,0 @@
import { db } from '@/db/knex'
import { type UserNotificationRecord } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
import cryptoRandomString from 'crypto-random-string'
import { assign } from 'lodash-es'
export const buildTestNotification = (
overrides?: Partial<UserNotificationRecord>
): UserNotificationRecord =>
assign(
{
id: cryptoRandomString({ length: 10 }),
userId: cryptoRandomString({ length: 10 }),
type: NotificationType.MentionedInComment,
version: '1',
read: false,
payload: {},
sendEmailAt: null,
createdAt: new Date(),
updatedAt: new Date()
},
overrides
)
export const createTestNotification = async (
notification?: UserNotificationRecord
): Promise<UserNotificationRecord> => {
const storeUserNotifications = storeUserNotificationsFactory({ db })
const storeNotification = notification || buildTestNotification()
await storeUserNotifications([storeNotification])
return storeNotification
}
@@ -1,184 +0,0 @@
import { db } from '@/db/knex'
import { UserNotifications } from '@/modules/core/dbSchema'
import {
GetUserNotificationsDocument,
UserBulkDeleteNotidicationDocument,
UserBulkUpdateNotificationsDocument
} from '@/modules/core/graph/generated/graphql'
import { getUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification'
import {
buildTestNotification,
createTestNotification
} from '@/modules/notifications/tests/helpers'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
import type { BasicTestUser } from '@/test/authHelper'
import { createTestUser } from '@/test/authHelper'
import type { TestApolloServer } from '@/test/graphqlHelper'
import { testApolloServer } from '@/test/graphqlHelper'
import { beforeEachContext, truncateTables } from '@/test/hooks'
import { expect } from 'chai'
import { times } from 'lodash-es'
isNotificationListenerEnabled()
? describe('Notifications GQL', () => {
let apollo: TestApolloServer
let user: BasicTestUser
let anotherUser: BasicTestUser
before(async () => {
await beforeEachContext()
user = await createTestUser()
anotherUser = await createTestUser()
apollo = await testApolloServer({ authUserId: user.id })
})
beforeEach(async () => {
await truncateTables([UserNotifications.name])
})
it('pulls only your notifications', async () => {
await createTestNotification(
buildTestNotification({
userId: user.id
})
)
await createTestNotification(
buildTestNotification({
userId: anotherUser.id
})
)
await createTestNotification(
buildTestNotification({
userId: user.id
})
)
const { data } = await apollo.execute(
GetUserNotificationsDocument,
{},
{ assertNoErrors: true }
)
expect(data?.activeUser?.notifications.items).to.have.lengthOf(2)
})
it('paginates your notifications', async () => {
await Promise.all(
times(50).map(async () =>
createTestNotification(
buildTestNotification({
userId: user.id
})
)
)
)
const { data } = await apollo.execute(
GetUserNotificationsDocument,
{ limit: 10 },
{ assertNoErrors: true }
)
expect(data?.activeUser?.notifications.items).to.have.lengthOf(10)
expect(data?.activeUser?.notifications.cursor).to.be.a('string')
expect(data?.activeUser?.notifications.totalCount).to.be.eq(50)
})
it('allows deleting only your notifications', async () => {
const n1 = await createTestNotification(
buildTestNotification({
userId: user.id
})
)
const n2 = await createTestNotification(
buildTestNotification({
userId: anotherUser.id
})
)
const n3 = await createTestNotification(
buildTestNotification({
userId: user.id
})
)
await apollo.execute(
UserBulkDeleteNotidicationDocument,
{
ids: [n1.id, n2.id, n3.id] // n2 shouldn't be deleted
},
{ assertNoErrors: true }
)
const { data } = await apollo.execute(
GetUserNotificationsDocument,
{},
{ assertNoErrors: true }
)
const otherNotifications = await getUserNotificationsFactory({ db })({
userId: anotherUser.id,
cursor: null,
limit: null
})
expect(data?.activeUser?.notifications.totalCount).to.be.equal(0)
expect(data?.activeUser?.notifications.items).to.have.lengthOf(0)
expect(otherNotifications.items).to.have.lengthOf(1)
})
it('allows updating read field in the notification', async () => {
const n1 = await createTestNotification(
buildTestNotification({
userId: user.id,
read: false
})
)
const n2 = await createTestNotification(
buildTestNotification({
userId: anotherUser.id,
read: false
})
)
const n3 = await createTestNotification(
buildTestNotification({
userId: user.id,
read: false
})
)
await apollo.execute(
UserBulkUpdateNotificationsDocument,
{
input: [
{
id: n1.id,
read: true
},
{
id: n2.id,
read: true // n2 shouldn't be updated
},
{
id: n3.id,
read: true
}
]
},
{ assertNoErrors: true }
)
const { data } = await apollo.execute(
GetUserNotificationsDocument,
{},
{ assertNoErrors: true }
)
const otherNotifications = await getUserNotificationsFactory({ db })({
userId: anotherUser.id,
cursor: null,
limit: null
})
expect(data?.activeUser?.notifications.items[0].read).to.be.true
expect(data?.activeUser?.notifications.items[1].read).to.be.true
expect(otherNotifications.items[0].read).to.be.false
})
})
: {}
@@ -0,0 +1,130 @@
import type { MentionedInCommentData } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { publishNotification } from '@/modules/notifications/services/publication'
import type { NotificationsStateManager } from '@/test/notificationsHelper'
import {
buildNotificationsStateTracker,
purgeNotifications
} from '@/test/notificationsHelper'
import { expect } from 'chai'
import {
InvalidNotificationError,
NotificationValidationError,
UnhandledNotificationError
} from '@/modules/notifications/errors'
import { NotificationJobResultsStatus } from '@/modules/notifications/services/queue'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { NotificationsEvents } from '@/modules/notifications/domain/events'
describe('Notifications', () => {
let notificationsState: NotificationsStateManager
before(async () => {
await purgeNotifications()
notificationsState = await buildNotificationsStateTracker()
})
after(async () => {
notificationsState.destroy()
})
afterEach(() => {
notificationsState.reset()
})
it('can be emitted and routed to proper handler on consumption', async () => {
const targetUserId = '1234555'
const data: MentionedInCommentData = {
threadId: 'aaa',
commentId: 'bbb',
authorId: 'ccc',
streamId: 'ddd'
}
// Enqueue notification
const msgId = await publishNotification(NotificationType.MentionedInComment, {
targetUserId,
data
})
// Wait for ack
await notificationsState.waitForMsgAck(msgId)
const enqueuedMessage = notificationsState.collectedMessages().at(-1)!
expect(enqueuedMessage).to.be.ok
expect(enqueuedMessage?.targetUserId).to.eq(targetUserId)
expect(enqueuedMessage?.type).to.eq(NotificationType.MentionedInComment)
expect(enqueuedMessage?.data).to.deep.equalInAnyOrder(data)
})
it('fail safely when emitted with an unexpected structure', async () => {
// Enqueue notification with invalid structure
const msgId = await publishNotification(NotificationType.MentionedInComment, {
a: 1,
b: 2
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any)
const { err } = await notificationsState.waitForMsgAck(msgId)
expect(err).to.be.ok
expect(err instanceof InvalidNotificationError).to.be.true
expect(err?.message).to.contain('invalid notification')
})
it('fail safely when emitted with an unexpected type', async () => {
// Enqueue notification with invalid structure
const msgId = await publishNotification('booooooooo' as NotificationType, {
targetUserId: '123',
data: {
a: 123
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any
})
const { err } = await notificationsState.waitForMsgAck(msgId)
expect(err).to.be.ok
expect(err instanceof UnhandledNotificationError).to.be.true
})
const validationErrorDataSet = [
{
display: 'successful',
error: new NotificationValidationError('expected validation isue')
},
{ display: 'unsuccessful', error: new Error('ooohhhh') }
]
validationErrorDataSet.forEach(({ display, error }) => {
it(`fail with ${display} ack when handler throws ${error.name}`, async () => {
const data: MentionedInCommentData = {
threadId: 'aaa',
commentId: 'bbb',
authorId: 'ccc',
streamId: 'ddd'
}
getEventBus().listenOnce(NotificationsEvents.Received, () => {
throw error
})
const msgId = await publishNotification(NotificationType.MentionedInComment, {
targetUserId: '123',
data
})
const { err, result } = await notificationsState.waitForMsgAck(msgId)
const isValidationError = error instanceof NotificationValidationError
if (isValidationError) {
expect(err).to.be.not.ok
expect(result?.status).to.eq(NotificationJobResultsStatus.ValidationError)
} else {
expect(err).to.be.ok
expect(err?.name).to.eq(error.name)
expect(err?.message).to.eq(error.message)
expect(result).to.be.not.ok
}
})
})
})
@@ -3,8 +3,10 @@ import { UserNotificationPreferences, Users } from '@/modules/core/dbSchema'
import type { BasicTestUser } from '@/test/authHelper'
import { createTestUsers } from '@/test/authHelper'
import { expect } from 'chai'
import { NotificationType } from '@speckle/shared/notifications'
import { NotificationChannel } from '@/modules/notifications/helpers/types'
import {
NotificationType,
NotificationChannel
} from '@/modules/notifications/helpers/types'
import { BaseError } from '@/modules/shared/errors'
import {
getUserNotificationPreferencesFactory,
@@ -13,7 +15,7 @@ import {
import {
getSavedUserNotificationPreferencesFactory,
saveUserNotificationPreferencesFactory
} from '@/modules/notifications/repositories/userNotificationPreferences'
} from '@/modules/notifications/repositories'
import { db } from '@/db/knex'
const getSavedUserNotificationPreferences = getSavedUserNotificationPreferencesFactory({
@@ -1,133 +0,0 @@
import type { MentionedInCommentData } from '@/modules/notifications/helpers/types'
import { NotificationType } from '@speckle/shared/notifications'
import { publishNotification } from '@/modules/notifications/services/publication/publishNotification'
import type { NotificationsStateManager } from '@/test/notificationsHelper'
import {
buildNotificationsStateTracker,
purgeNotifications
} from '@/test/notificationsHelper'
import { expect } from 'chai'
import {
InvalidNotificationError,
NotificationValidationError,
UnhandledNotificationError
} from '@/modules/notifications/errors'
import { NotificationJobResultsStatus } from '@/modules/notifications/services/publication/queue'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { NotificationsEvents } from '@/modules/notifications/domain/events'
import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper'
!isNotificationListenerEnabled()
? describe('Notifications', () => {
let notificationsState: NotificationsStateManager
before(async () => {
await purgeNotifications()
notificationsState = await buildNotificationsStateTracker()
})
after(async () => {
notificationsState.destroy()
})
afterEach(() => {
notificationsState.reset()
})
it('can be emitted and routed to proper handler on consumption', async () => {
const targetUserId = '1234555'
const data: MentionedInCommentData = {
threadId: 'aaa',
commentId: 'bbb',
authorId: 'ccc',
streamId: 'ddd'
}
// Enqueue notification
const msgId = await publishNotification(NotificationType.MentionedInComment, {
targetUserId,
data
})
// Wait for ack
await notificationsState.waitForMsgAck(msgId)
const enqueuedMessage = notificationsState.collectedMessages().at(-1)!
expect(enqueuedMessage).to.be.ok
expect(enqueuedMessage?.targetUserId).to.eq(targetUserId)
expect(enqueuedMessage?.type).to.eq(NotificationType.MentionedInComment)
expect(enqueuedMessage?.data).to.deep.equalInAnyOrder(data)
})
it('fail safely when emitted with an unexpected structure', async () => {
// Enqueue notification with invalid structure
const msgId = await publishNotification(NotificationType.MentionedInComment, {
a: 1,
b: 2
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any)
const { err } = await notificationsState.waitForMsgAck(msgId)
expect(err).to.be.ok
expect(err instanceof InvalidNotificationError).to.be.true
expect(err?.message).to.contain('invalid notification')
})
it('fail safely when emitted with an unexpected type', async () => {
// Enqueue notification with invalid structure
const msgId = await publishNotification('booooooooo' as NotificationType, {
targetUserId: '123',
data: {
a: 123
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any
})
const { err } = await notificationsState.waitForMsgAck(msgId)
expect(err).to.be.ok
expect(err instanceof UnhandledNotificationError).to.be.true
})
const validationErrorDataSet = [
{
display: 'successful',
error: new NotificationValidationError('expected validation isue')
},
{ display: 'unsuccessful', error: new Error('ooohhhh') }
]
validationErrorDataSet.forEach(({ display, error }) => {
it(`fail with ${display} ack when handler throws ${error.name}`, async () => {
const data: MentionedInCommentData = {
threadId: 'aaa',
commentId: 'bbb',
authorId: 'ccc',
streamId: 'ddd'
}
getEventBus().listenOnce(NotificationsEvents.Received, () => {
throw error
})
const msgId = await publishNotification(NotificationType.MentionedInComment, {
targetUserId: '123',
data
})
const { err, result } = await notificationsState.waitForMsgAck(msgId)
const isValidationError = error instanceof NotificationValidationError
if (isValidationError) {
expect(err).to.be.not.ok
expect(result?.status).to.eq(NotificationJobResultsStatus.ValidationError)
} else {
expect(err).to.be.ok
expect(err?.name).to.eq(error.name)
expect(err?.message).to.eq(error.message)
expect(result).to.be.not.ok
}
})
})
})
: {}
@@ -558,6 +558,3 @@ export function getOdaUserSecret() {
export const areSavedViewsEnabled = (): boolean =>
getFeatureFlags().FF_SAVED_VIEWS_ENABLED
export const isNotificationListenerEnabled = (): boolean =>
getFeatureFlags().FF_NOTIFICATION_LISTENER_ENABLED
@@ -154,14 +154,6 @@ export type EventPayload<T extends EventSubscriptionKey> = T extends AllEventsWi
? EventPayloadsMap[T]
: never
/**
* To single specify which event to use
*/
export type EventType<EventName extends EventNames> = {
eventName: EventName
payload: EventTypes[EventName]
}
export function initializeEventBus() {
const emitter = new EventEmitter({ wildcard: true })
@@ -171,9 +163,10 @@ export function initializeEventBus() {
* execute. Any errors thrown in the listeners will bubble up and throw from
* the part of code that triggers this emit() call.
*/
emit: async <EventName extends EventNames>(
args: EventType<EventName>
): Promise<void> => {
emit: async <EventName extends EventNames>(args: {
eventName: EventName
payload: EventTypes[EventName]
}): Promise<void> => {
// curate the proper payload here and eventName object here, before emitting
await emitter.emitAsync(args.eventName, args)
},
@@ -1,36 +0,0 @@
import gql from 'graphql-tag'
export const getUserNotifications = gql`
query GetUserNotifications($limit: Int, $cursor: String) {
activeUser {
notifications(limit: $limit, cursor: $cursor) {
items {
id
type
createdAt
payload
read
updatedAt
}
cursor
totalCount
}
}
}
`
export const deleteUserNotifications = gql`
mutation UserBulkDeleteNotidication($ids: [String!]!) {
notificationMutations {
bulkDelete(ids: $ids)
}
}
`
export const updateUserNotifications = gql`
mutation UserBulkUpdateNotifications($input: [NotificationUpdateInput!]!) {
notificationMutations {
bulkUpdate(input: $input)
}
}
`
+2 -2
View File
@@ -1,6 +1,6 @@
import { notificationsLogger as logger } from '@/observability/logging'
import type { NotificationJobResult } from '@/modules/notifications/services/publication/queue'
import { getQueue } from '@/modules/notifications/services/publication/queue'
import type { NotificationJobResult } from '@/modules/notifications/services/queue'
import { getQueue } from '@/modules/notifications/services/queue'
import { EventEmitter } from 'events'
import type { CompletedEventCallback, FailedEventCallback, JobId } from 'bull'
import { pick } from 'lodash-es'
-11
View File
@@ -108,7 +108,6 @@
"./acc": "./src/acc/index.ts",
"./dist/*": "./dist/*",
"./images/base64": "./src/images/base64.ts",
"./notifications": "./src/notifications/index.ts",
"./saved-views": "./src/saved-views/index.ts"
},
"exclude": [
@@ -346,16 +345,6 @@
"default": "./dist/commonjs/images/base64.js"
}
},
"./notifications": {
"import": {
"types": "./dist/esm/notifications/index.d.ts",
"default": "./dist/esm/notifications/index.js"
},
"require": {
"types": "./dist/commonjs/notifications/index.d.ts",
"default": "./dist/commonjs/notifications/index.js"
}
},
"./saved-views": {
"import": {
"types": "./dist/esm/saved-views/index.d.ts",
-7
View File
@@ -120,10 +120,6 @@ export const Scopes = Object.freeze(<const>{
Email: 'users:email',
Invite: 'users:invite'
},
Notifications: {
Read: 'notifications:read',
Write: 'notifications:write'
},
Server: {
Stats: 'server:stats',
Setup: 'server:setup'
@@ -157,8 +153,6 @@ export const Scopes = Object.freeze(<const>{
export type StreamScopes = (typeof Scopes)['Streams'][keyof (typeof Scopes)['Streams']]
export type ProfileScopes = (typeof Scopes)['Profile'][keyof (typeof Scopes)['Profile']]
export type UserScopes = (typeof Scopes)['Users'][keyof (typeof Scopes)['Users']]
export type NotificationScopes =
(typeof Scopes)['Notifications'][keyof (typeof Scopes)['Notifications']]
export type ServerScopes = (typeof Scopes)['Server'][keyof (typeof Scopes)['Server']]
export type TokenScopes = (typeof Scopes)['Tokens'][keyof (typeof Scopes)['Tokens']]
export type AppScopes = (typeof Scopes)['Apps'][keyof (typeof Scopes)['Apps']]
@@ -181,7 +175,6 @@ export type AvailableScopes =
| AutomateScopes
| AutomateFunctionScopes
| WorkspaceScopes
| NotificationScopes
/**
* All scopes
@@ -22,5 +22,4 @@ export type FeatureFlags = {
FF_DASHBOARDS_MODULE_ENABLED: boolean
FF_SAVED_VIEWS_ENABLED: boolean
FF_USERS_INVITE_SCOPE_IS_PUBLIC: boolean
FF_NOTIFICATION_LISTENER_ENABLED: boolean
}
-5
View File
@@ -150,11 +150,6 @@ export const parseFeatureFlags = (
description:
'Enables Personal Access Tokens (PAT) to be created with users:invite scope. **WARNING** This can be used to spam invitations to any email address. It is not advised to enable this on servers which are open to public account registration or to which untrusted users have been, or can be, invited.',
defaults: { _: false }
},
FF_NOTIFICATION_LISTENER_ENABLED: {
schema: z.boolean(),
description: 'Enables notifications being triggered by server event listeners',
defaults: { _: false }
}
})
@@ -1,26 +0,0 @@
export enum NotificationType {
/**
* @deprecated ActivityDigest will be removed in a future release
*/
ActivityDigest = 'activityDigest',
MentionedInComment = 'mentionedInComment',
NewStreamAccessRequest = 'newStreamAccessRequest',
StreamAccessRequestApproved = 'streamAccessRequestApproved'
}
export type NotificationPayloadMap = {
[NotificationType.MentionedInComment]: {
threadId: string
authorId: string
commentId: string
streamId: string
}
[NotificationType.NewStreamAccessRequest]: {
streamId: string
requesterId: string
}
[NotificationType.StreamAccessRequestApproved]: {
streamId: string
}
[NotificationType.ActivityDigest]: Record<string, unknown>
}
@@ -1 +0,0 @@
export * from './helpers/types.js'
@@ -613,9 +613,6 @@ Generate the environment variables for Speckle server and Speckle objects deploy
- name: FF_NO_PERSONAL_EMAILS_ENABLED
value: {{ .Values.featureFlags.noPersonalEmailsEnabled | quote }}
- name: FF_NOTIFICATION_LISTENER_ENABLED
value: {{ .Values.featureFlags.notificationListenerEnabled | quote }}
{{- if .Values.featureFlags.accIntegrationEnabled }}
- name: AUTODESK_INTEGRATION_CLIENT_ID
value: {{ .Values.server.accIntegration.client_id }}