From 3ca4a11ca39e85a51edae3ca6cccb7c19423b94e Mon Sep 17 00:00:00 2001 From: Daniel Gak Anagrov Date: Mon, 6 Oct 2025 13:19:12 +0200 Subject: [PATCH] feat(notifications): basic listener structure, notification record, delayed mechanism (#5432) * feat: basic notification listener sturcuture * feat: clean up generated gql * chore: edited structure * feat: added basic repo * feat: ported comment email to job queue * feat: ported stream access request accepted * feat: added notification insertion * fix: minor typings * feat: delayed notifications * updated types * feat: fixed gql * notifications are listed * index on notifications * feat: while loop skiping for update locked * delayed notification for access request * take into account user prefrences * on comment view, notification is marked as read * feat: added gql notifications * feat: avoid raising errors * fix: error added scopes * fix: mr comments * fix: cursor and service method * feat: added stronger types to notifications and versioning logic * minor: rows updated --- .../lib/common/generated/gql/graphql.ts | 78 ++++++ .../typedefs/notifications.graphql | 48 ++++ packages/server/codegen.ts | 2 + .../server/modules/accessrequests/index.ts | 2 +- .../accessrequests/services/eventListener.ts | 2 +- .../tests/projectAccessRequests.spec.ts | 48 ++-- .../tests/streamAccessRequests.spec.ts | 50 ++-- .../server/modules/activitystream/index.ts | 2 +- .../activitystream/services/summary.ts | 2 +- .../tests/integration/activitySummary.spec.ts | 2 +- .../modules/cli/commands/activities/send.ts | 6 +- .../modules/cli/commands/bull/monitor.ts | 2 +- .../modules/cli/commands/bull/test-consume.ts | 6 +- .../modules/cli/commands/bull/test-push.ts | 8 +- .../server/modules/comments/domain/events.ts | 5 + .../comments/graph/resolvers/comments.ts | 17 +- packages/server/modules/comments/index.ts | 2 +- .../modules/comments/services/management.ts | 18 ++ .../comments/services/notifications.ts | 54 +--- .../modules/comments/tests/comments.spec.ts | 41 +-- packages/server/modules/core/dbSchema.ts | 12 + .../modules/core/graph/generated/graphql.ts | 114 ++++++++ packages/server/modules/core/scopes.ts | 10 + .../notifications/domain/operations.ts | 50 +++- .../events/notificationListener.ts | 55 ++++ .../resolvers/userNotificationPreferences.ts | 4 +- .../graph/resolvers/userNotifications.ts | 98 +++++++ .../notifications/helpers/toLatestVersion.ts | 45 ++++ .../modules/notifications/helpers/types.ts | 40 ++- .../server/modules/notifications/index.ts | 51 +++- .../20250915154600_user_notifications.ts | 28 ++ .../repositories/userNotification.ts | 126 +++++++++ .../userNotificationPreferences.ts} | 0 .../handlers/createdOrUpdatedComment.ts | 251 ++++++++++++++++++ .../handlers/streamAccessRequestCreated.ts | 157 +++++++++++ .../handlers/streamAccessRequestFinalized.ts | 114 ++++++++ .../notifications/services/events/queue.ts | 102 +++++++ .../services/notificationPreferences.ts | 19 +- .../handlers/activityDigest.ts | 2 +- .../handlers/mentionedInComment.ts | 4 +- .../handlers/newStreamAccessRequest.ts | 0 .../handlers/streamAccessRequestApproved.ts | 0 .../publishNotification.ts} | 8 +- .../services/{ => publication}/queue.ts | 6 +- .../tasks/delayedNotifications.ts | 105 ++++++++ .../tasks/handlers/mentionedInComment.ts | 150 +++++++++++ .../tasks/handlers/newStreamAccessRequest.ts | 170 ++++++++++++ .../handlers/streamAccessRequestApproved.ts | 134 ++++++++++ .../tests/activityDigest.spec.ts | 4 +- .../modules/notifications/tests/helpers.ts | 35 +++ .../tests/notifications.graph.spec.ts | 184 +++++++++++++ .../notifications/tests/notifications.spec.ts | 130 --------- .../tests/notificationsPreferences.spec.ts | 8 +- .../tests/pushNotifications.spec.ts | 133 ++++++++++ .../modules/shared/helpers/envHelper.ts | 3 + .../modules/shared/services/eventBus.ts | 15 +- packages/server/test/graphql/notifications.ts | 36 +++ packages/server/test/notificationsHelper.ts | 4 +- packages/shared/package.json | 11 + packages/shared/src/core/constants.ts | 7 + .../shared/src/environment/featureFlags.ts | 1 + packages/shared/src/environment/index.ts | 5 + .../shared/src/notifications/helpers/types.ts | 26 ++ packages/shared/src/notifications/index.ts | 1 + .../speckle-server/templates/_helpers.tpl | 3 + 65 files changed, 2554 insertions(+), 302 deletions(-) create mode 100644 packages/server/assets/notifications/typedefs/notifications.graphql create mode 100644 packages/server/modules/notifications/events/notificationListener.ts create mode 100644 packages/server/modules/notifications/graph/resolvers/userNotifications.ts create mode 100644 packages/server/modules/notifications/helpers/toLatestVersion.ts create mode 100644 packages/server/modules/notifications/migrations/20250915154600_user_notifications.ts create mode 100644 packages/server/modules/notifications/repositories/userNotification.ts rename packages/server/modules/notifications/{repositories.ts => repositories/userNotificationPreferences.ts} (100%) create mode 100644 packages/server/modules/notifications/services/events/handlers/createdOrUpdatedComment.ts create mode 100644 packages/server/modules/notifications/services/events/handlers/streamAccessRequestCreated.ts create mode 100644 packages/server/modules/notifications/services/events/handlers/streamAccessRequestFinalized.ts create mode 100644 packages/server/modules/notifications/services/events/queue.ts rename packages/server/modules/notifications/services/{ => publication}/handlers/activityDigest.ts (99%) rename packages/server/modules/notifications/services/{ => publication}/handlers/mentionedInComment.ts (99%) rename packages/server/modules/notifications/services/{ => publication}/handlers/newStreamAccessRequest.ts (100%) rename packages/server/modules/notifications/services/{ => publication}/handlers/streamAccessRequestApproved.ts (100%) rename packages/server/modules/notifications/services/{publication.ts => publication/publishNotification.ts} (58%) rename packages/server/modules/notifications/services/{ => publication}/queue.ts (96%) create mode 100644 packages/server/modules/notifications/tasks/delayedNotifications.ts create mode 100644 packages/server/modules/notifications/tasks/handlers/mentionedInComment.ts create mode 100644 packages/server/modules/notifications/tasks/handlers/newStreamAccessRequest.ts create mode 100644 packages/server/modules/notifications/tasks/handlers/streamAccessRequestApproved.ts create mode 100644 packages/server/modules/notifications/tests/helpers.ts create mode 100644 packages/server/modules/notifications/tests/notifications.graph.spec.ts delete mode 100644 packages/server/modules/notifications/tests/notifications.spec.ts create mode 100644 packages/server/modules/notifications/tests/pushNotifications.spec.ts create mode 100644 packages/server/test/graphql/notifications.ts create mode 100644 packages/shared/src/notifications/helpers/types.ts create mode 100644 packages/shared/src/notifications/index.ts diff --git a/packages/frontend-2/lib/common/generated/gql/graphql.ts b/packages/frontend-2/lib/common/generated/gql/graphql.ts index 2de22d19f..c0f4af179 100644 --- a/packages/frontend-2/lib/common/generated/gql/graphql.ts +++ b/packages/frontend-2/lib/common/generated/gql/graphql.ts @@ -2064,6 +2064,7 @@ export type Mutation = { */ inviteResend: Scalars['Boolean']['output']; modelMutations: ModelMutations; + notificationMutations: NotificationMutations; /** @deprecated Part of the old API surface and will be removed in the future. */ objectCreate: Array; projectMutations: ProjectMutations; @@ -2457,6 +2458,39 @@ export type MutationWebhookUpdateArgs = { webhook: WebhookUpdateInput; }; +export type Notification = { + __typename?: 'Notification'; + createdAt: Scalars['DateTime']['output']; + id: Scalars['ID']['output']; + payload: Scalars['JSONObject']['output']; + read: Scalars['Boolean']['output']; + type: Scalars['String']['output']; + updatedAt: Scalars['DateTime']['output']; +}; + +export type NotificationMutations = { + __typename?: 'NotificationMutations'; + /** Delete an existing notification */ + bulkDelete: Scalars['Boolean']['output']; + /** update notidication */ + bulkUpdate: Scalars['Boolean']['output']; +}; + + +export type NotificationMutationsBulkDeleteArgs = { + ids: Array; +}; + + +export type NotificationMutationsBulkUpdateArgs = { + ids: Array; + input: NotificationUpdateInput; +}; + +export type NotificationUpdateInput = { + read: Scalars['Boolean']['input']; +}; + export type Object = { __typename?: 'Object'; /** @deprecated Not implemented. */ @@ -4944,6 +4978,8 @@ export type User = { meta: UserMeta; name: Scalars['String']['output']; notificationPreferences: Scalars['JSONObject']['output']; + /** List all notifications for the user */ + notifications: UserNotificationCollection; permissions: RootPermissionChecks; profiles?: Maybe; /** Get pending project access request, that the user made */ @@ -5029,6 +5065,16 @@ export type UserFavoriteStreamsArgs = { }; +/** + * Full user type, should only be used in the context of admin operations or + * when a user is reading/writing info about himself + */ +export type UserNotificationsArgs = { + cursor?: InputMaybe; + limit?: InputMaybe; +}; + + /** * Full user type, should only be used in the context of admin operations or * when a user is reading/writing info about himself @@ -5219,6 +5265,13 @@ export type UserMetaMutationsSetSpeckleConBannerDismissedArgs = { value: Scalars['Boolean']['input']; }; +export type UserNotificationCollection = { + __typename?: 'UserNotificationCollection'; + cursor?: Maybe; + items: Array; + totalCount: Scalars['Int']['output']; +}; + export type UserProjectCollection = { __typename?: 'UserProjectCollection'; cursor?: Maybe; @@ -9470,6 +9523,8 @@ export type AllObjectTypes = { ModelsTreeItem: ModelsTreeItem, ModelsTreeItemCollection: ModelsTreeItemCollection, Mutation: Mutation, + Notification: Notification, + NotificationMutations: NotificationMutations, Object: Object, ObjectCollection: ObjectCollection, PasswordStrengthCheckFeedback: PasswordStrengthCheckFeedback, @@ -9550,6 +9605,7 @@ export type AllObjectTypes = { UserGendoAICredits: UserGendoAiCredits, UserMeta: UserMeta, UserMetaMutations: UserMetaMutations, + UserNotificationCollection: UserNotificationCollection, UserProjectCollection: UserProjectCollection, UserProjectsUpdatedMessage: UserProjectsUpdatedMessage, UserSearchResultCollection: UserSearchResultCollection, @@ -10299,6 +10355,7 @@ export type MutationFieldArgs = { inviteDelete: MutationInviteDeleteArgs, inviteResend: MutationInviteResendArgs, modelMutations: {}, + notificationMutations: {}, objectCreate: MutationObjectCreateArgs, projectMutations: {}, requestVerification: {}, @@ -10334,6 +10391,18 @@ export type MutationFieldArgs = { workspaceJoinRequestMutations: {}, workspaceMutations: {}, } +export type NotificationFieldArgs = { + createdAt: {}, + id: {}, + payload: {}, + read: {}, + type: {}, + updatedAt: {}, +} +export type NotificationMutationsFieldArgs = { + bulkDelete: NotificationMutationsBulkDeleteArgs, + bulkUpdate: NotificationMutationsBulkUpdateArgs, +} export type ObjectFieldArgs = { applicationId: {}, children: ObjectChildrenArgs, @@ -10986,6 +11055,7 @@ export type UserFieldArgs = { meta: {}, name: {}, notificationPreferences: {}, + notifications: UserNotificationsArgs, permissions: {}, profiles: {}, projectAccessRequest: UserProjectAccessRequestArgs, @@ -11040,6 +11110,11 @@ export type UserMetaMutationsFieldArgs = { setSpeckleCon25BannerDismissed: UserMetaMutationsSetSpeckleCon25BannerDismissedArgs, setSpeckleConBannerDismissed: UserMetaMutationsSetSpeckleConBannerDismissedArgs, } +export type UserNotificationCollectionFieldArgs = { + cursor: {}, + items: {}, + totalCount: {}, +} export type UserProjectCollectionFieldArgs = { cursor: {}, items: {}, @@ -11455,6 +11530,8 @@ export type AllObjectFieldArgTypes = { ModelsTreeItem: ModelsTreeItemFieldArgs, ModelsTreeItemCollection: ModelsTreeItemCollectionFieldArgs, Mutation: MutationFieldArgs, + Notification: NotificationFieldArgs, + NotificationMutations: NotificationMutationsFieldArgs, Object: ObjectFieldArgs, ObjectCollection: ObjectCollectionFieldArgs, PasswordStrengthCheckFeedback: PasswordStrengthCheckFeedbackFieldArgs, @@ -11535,6 +11612,7 @@ export type AllObjectFieldArgTypes = { UserGendoAICredits: UserGendoAiCreditsFieldArgs, UserMeta: UserMetaFieldArgs, UserMetaMutations: UserMetaMutationsFieldArgs, + UserNotificationCollection: UserNotificationCollectionFieldArgs, UserProjectCollection: UserProjectCollectionFieldArgs, UserProjectsUpdatedMessage: UserProjectsUpdatedMessageFieldArgs, UserSearchResultCollection: UserSearchResultCollectionFieldArgs, diff --git a/packages/server/assets/notifications/typedefs/notifications.graphql b/packages/server/assets/notifications/typedefs/notifications.graphql new file mode 100644 index 000000000..81e99bc3e --- /dev/null +++ b/packages/server/assets/notifications/typedefs/notifications.graphql @@ -0,0 +1,48 @@ +type UserNotificationCollection { + totalCount: Int! + cursor: String + items: [Notification!]! +} + +type Notification { + id: ID! + type: String! + read: Boolean! + payload: JSONObject! + createdAt: DateTime! + updatedAt: DateTime! +} + +input NotificationUpdateInput { + id: ID! + read: Boolean! +} + +extend type User { + """ + List all notifications for the user + """ + notifications(cursor: String, limit: Int): UserNotificationCollection! + @isOwner + @hasScope(scope: "notifications:read") +} + +extend type Mutation { + notificationMutations: NotificationMutations! @hasServerRole(role: SERVER_GUEST) +} + +type NotificationMutations { + """ + Delete an existing notification + """ + bulkDelete(ids: [String!]!): Boolean! + @hasServerRole(role: SERVER_GUEST) + @hasScope(scope: "notifications:write") + + """ + update notidication + """ + bulkUpdate(input: [NotificationUpdateInput!]!): Boolean! + @hasServerRole(role: SERVER_GUEST) + @hasScope(scope: "notifications:write") +} diff --git a/packages/server/codegen.ts b/packages/server/codegen.ts index 0de5af7a3..f65806a19 100644 --- a/packages/server/codegen.ts +++ b/packages/server/codegen.ts @@ -67,6 +67,8 @@ const config: CodegenConfig = { '@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn', AdminMutations: '@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn', + NotificationMutations: + '@/modules/core/helpers/graphTypes#MutationsObjectGraphQLReturn', AdminQueries: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn', ServerStatistics: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn', ServerStats: '@/modules/core/helpers/graphTypes#GraphQLEmptyReturn', diff --git a/packages/server/modules/accessrequests/index.ts b/packages/server/modules/accessrequests/index.ts index 2f536c316..59e546bf2 100644 --- a/packages/server/modules/accessrequests/index.ts +++ b/packages/server/modules/accessrequests/index.ts @@ -2,7 +2,7 @@ import { db } from '@/db/knex' import { moduleLogger } from '@/observability/logging' import { initializeEventListenerFactory } from '@/modules/accessrequests/services/eventListener' import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams' -import { publishNotification } from '@/modules/notifications/services/publication' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { getEventBus } from '@/modules/shared/services/eventBus' diff --git a/packages/server/modules/accessrequests/services/eventListener.ts b/packages/server/modules/accessrequests/services/eventListener.ts index 929503cbf..72ff6f15b 100644 --- a/packages/server/modules/accessrequests/services/eventListener.ts +++ b/packages/server/modules/accessrequests/services/eventListener.ts @@ -3,7 +3,7 @@ import { isStreamAccessRequest } from '@/modules/accessrequests/repositories' import type { GetStreamCollaborators } from '@/modules/core/domain/streams/operations' import { Roles } from '@/modules/core/helpers/mainConstants' import type { NotificationPublisher } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus' type OnServerAccessRequestCreatedDeps = { diff --git a/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts b/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts index 34bf317ce..dcc874a37 100644 --- a/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts +++ b/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts @@ -37,7 +37,7 @@ import { removeStreamCollaboratorFactory, validateStreamAccessFactory } from '@/modules/core/services/streams/access' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import { authorizeResolver } from '@/modules/shared' import { getEventBus } from '@/modules/shared/services/eventBus' import type { BasicTestUser } from '@/test/authHelper' @@ -62,6 +62,7 @@ import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper' import { createTestStreams } from '@/test/speckle-helpers/streamHelper' import { expect } from 'chai' import { noop } from 'lodash-es' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' const getUser = getUserFactory({ db }) const getStream = getStreamFactory({ db }) @@ -236,12 +237,6 @@ describe('Project access requests', () => { }) ) - const { getSends } = emailListener.listen({ times: 1 }) - - const waitForAck = notificationsStateManager.waitForAck( - (e) => e.result?.type === NotificationType.NewStreamAccessRequest - ) - const results = await createReq(otherGuysPrivateStream.id) // req gets created @@ -257,18 +252,6 @@ describe('Project access requests', () => { expect(results.data?.projectMutations.accessRequestMutations.create.projectId).to .be.ok - await waitForAck - - // email gets sent out - const sentEmails = getSends() - expect(sentEmails.length).to.eq(1) - const emailParams = sentEmails[0] - - expect(emailParams.subject).to.contain('A user requested access to your project') - expect(emailParams.html).to.be.ok - expect(emailParams.text).to.be.ok - expect(emailParams.to).to.eq(otherGuy.email) - // activity stream item inserted const streamActivity = await getStreamActivities(otherGuysPrivateStream.id, { actionType: StreamActionTypes.Stream.AccessRequestSent, @@ -278,6 +261,33 @@ describe('Project access requests', () => { expect(eventFired).to.be.true }) + !isNotificationListenerEnabled() + ? it('sends an email notification ', async () => { + const { getSends } = emailListener.listen({ times: 1 }) + + const waitForAck = notificationsStateManager.waitForAck( + (e) => e.result?.type === NotificationType.NewStreamAccessRequest + ) + + const results = await createReq(otherGuysPrivateStream.id) + expect(results).to.not.haveGraphQLErrors() + + await waitForAck + + // email gets sent out + const sentEmails = getSends() + expect(sentEmails.length).to.eq(1) + const emailParams = sentEmails[0] + + expect(emailParams.subject).to.contain( + 'A user requested access to your project' + ) + expect(emailParams.html).to.be.ok + expect(emailParams.text).to.be.ok + expect(emailParams.to).to.eq(otherGuy.email) + }) + : {} + it('operation fails if request already exists', async () => { const firstResults = await createReq(otherGuysPrivateStream.id) expect(firstResults).to.not.haveGraphQLErrors() diff --git a/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts b/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts index d3f943eac..925e00061 100644 --- a/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts +++ b/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts @@ -38,7 +38,7 @@ import { removeStreamCollaboratorFactory, validateStreamAccessFactory } from '@/modules/core/services/streams/access' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import { authorizeResolver } from '@/modules/shared' import { getEventBus } from '@/modules/shared/services/eventBus' import type { BasicTestUser } from '@/test/authHelper' @@ -63,6 +63,7 @@ import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper' import { createTestStreams } from '@/test/speckle-helpers/streamHelper' import { expect } from 'chai' import { noop } from 'lodash-es' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' const getUser = getUserFactory({ db }) const getStreamCollaborators = getStreamCollaboratorsFactory({ db }) @@ -212,12 +213,6 @@ describe('Stream access requests', () => { }) it('operation succeeds', async () => { - const { getSends } = emailListener.listen({ times: 1 }) - - const waitForAck = notificationsStateManager.waitForAck( - (e) => e.result?.type === NotificationType.NewStreamAccessRequest - ) - const results = await createReq(otherGuysPrivateStream.id) // req gets created @@ -230,18 +225,6 @@ describe('Stream access requests', () => { ) expect(results.data?.streamAccessRequestCreate.streamId).to.be.ok - await waitForAck - - // email gets sent out - const sentEmails = getSends() - expect(sentEmails.length).to.eq(1) - const emailParams = sentEmails[0] - - expect(emailParams.subject).to.contain('A user requested access to your project') - expect(emailParams.html).to.be.ok - expect(emailParams.text).to.be.ok - expect(emailParams.to).to.eq(otherGuy.email) - // activity stream item inserted const streamActivity = await getStreamActivities(otherGuysPrivateStream.id, { actionType: StreamActionTypes.Stream.AccessRequestSent, @@ -250,6 +233,35 @@ describe('Stream access requests', () => { expect(streamActivity).to.have.lengthOf(1) }) + !isNotificationListenerEnabled() + ? it('sends an email notification', async () => { + const { getSends } = emailListener.listen({ times: 1 }) + + const waitForAck = notificationsStateManager.waitForAck( + (e) => e.result?.type === NotificationType.NewStreamAccessRequest + ) + + const results = await createReq(otherGuysPrivateStream.id) + + // req gets created + expect(results).to.not.haveGraphQLErrors() + + await waitForAck + + // email gets sent out + const sentEmails = getSends() + expect(sentEmails.length).to.eq(1) + const emailParams = sentEmails[0] + + expect(emailParams.subject).to.contain( + 'A user requested access to your project' + ) + expect(emailParams.html).to.be.ok + expect(emailParams.text).to.be.ok + expect(emailParams.to).to.eq(otherGuy.email) + }) + : {} + it('operation fails if request already exists', async () => { const firstResults = await createReq(otherGuysPrivateStream.id) expect(firstResults).to.not.haveGraphQLErrors() diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index d66ebfbff..c3bb40684 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -1,6 +1,6 @@ import type cron from 'node-cron' import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' -import { publishNotification } from '@/modules/notifications/services/publication' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' import { moduleLogger } from '@/observability/logging' import { weeklyEmailDigestEnabled } from '@/modules/shared/helpers/envHelper' import type { EventBus } from '@/modules/shared/services/eventBus' diff --git a/packages/server/modules/activitystream/services/summary.ts b/packages/server/modules/activitystream/services/summary.ts index ee0cd01aa..525c18c36 100644 --- a/packages/server/modules/activitystream/services/summary.ts +++ b/packages/server/modules/activitystream/services/summary.ts @@ -1,5 +1,5 @@ import type { NotificationPublisher } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import type { CreateActivitySummary, GetActiveUserStreams, diff --git a/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts b/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts index fe6996fc5..7f2117860 100644 --- a/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts +++ b/packages/server/modules/activitystream/tests/integration/activitySummary.spec.ts @@ -13,9 +13,9 @@ import { } from '@/modules/activitystream/helpers/types' import type { ActivityDigestMessage, - NotificationType, NotificationTypeMessageMap } from '@/modules/notifications/helpers/types' +import type { NotificationType } from '@speckle/shared/notifications' import { geUserStreamActivityFactory, saveStreamActivityFactory diff --git a/packages/server/modules/cli/commands/activities/send.ts b/packages/server/modules/cli/commands/activities/send.ts index e25a64943..7486e91b6 100644 --- a/packages/server/modules/cli/commands/activities/send.ts +++ b/packages/server/modules/cli/commands/activities/send.ts @@ -1,6 +1,6 @@ import type { CommandModule } from 'yargs' -import { initializeQueue } from '@/modules/notifications/services/queue' -import { publishNotification } from '@/modules/notifications/services/publication' +import { initializePublicationQueue } from '@/modules/notifications/services/publication/queue' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' import { cliLogger } from '@/observability/logging' import { sendActivityNotificationsFactory } from '@/modules/activitystream/services/summary' import { getActiveUserStreamsFactory } from '@/modules/activitystream/repositories' @@ -18,7 +18,7 @@ const command: CommandModule = { }) }, handler: async (argv) => { - await initializeQueue() + await initializePublicationQueue() const numberOfDays = argv.days as number const end = new Date() const start = new Date(end.getTime()) diff --git a/packages/server/modules/cli/commands/bull/monitor.ts b/packages/server/modules/cli/commands/bull/monitor.ts index 104b1add3..240de247a 100644 --- a/packages/server/modules/cli/commands/bull/monitor.ts +++ b/packages/server/modules/cli/commands/bull/monitor.ts @@ -6,7 +6,7 @@ import { BullAdapter } from '@bull-board/api/bullAdapter' import { NOTIFICATIONS_QUEUE, buildNotificationsQueue -} from '@/modules/notifications/services/queue' +} from '@/modules/notifications/services/publication/queue' import { noop } from 'lodash-es' import { cliLogger } from '@/observability/logging' diff --git a/packages/server/modules/cli/commands/bull/test-consume.ts b/packages/server/modules/cli/commands/bull/test-consume.ts index a22b0d911..d75290b69 100644 --- a/packages/server/modules/cli/commands/bull/test-consume.ts +++ b/packages/server/modules/cli/commands/bull/test-consume.ts @@ -1,6 +1,6 @@ import { cliLogger } from '@/observability/logging' -import { NotificationType } from '@/modules/notifications/helpers/types' -import { initializeConsumption } from '@/modules/notifications/index' +import { NotificationType } from '@speckle/shared/notifications' +import { initializePublicationConsumption } from '@/modules/notifications/index' import { EnvironmentResourceError } from '@/modules/shared/errors' import { get, noop } from 'lodash-es' import type { CommandModule } from 'yargs' @@ -12,7 +12,7 @@ const command: CommandModule = { cliLogger.info('Starting consumption...') // Overriding handler for test purposes, we don't want the actual mentions logic to run - await initializeConsumption({ + await initializePublicationConsumption({ [NotificationType.MentionedInComment]: async (msg, { logger, job }) => { logger.info('Received test message with payload', msg, job) diff --git a/packages/server/modules/cli/commands/bull/test-push.ts b/packages/server/modules/cli/commands/bull/test-push.ts index 1b04e164f..3bea9fe2e 100644 --- a/packages/server/modules/cli/commands/bull/test-push.ts +++ b/packages/server/modules/cli/commands/bull/test-push.ts @@ -1,8 +1,8 @@ import { cliLogger } from '@/observability/logging' import type { MentionedInCommentData } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' -import { publishNotification } from '@/modules/notifications/services/publication' -import { initializeQueue } from '@/modules/notifications/services/queue' +import { NotificationType } from '@speckle/shared/notifications' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' +import { initializePublicationQueue } from '@/modules/notifications/services/publication/queue' import type { CommandModule } from 'yargs' const command: CommandModule = { @@ -22,7 +22,7 @@ const command: CommandModule = { }) }, handler: async (argv) => { - await initializeQueue() + await initializePublicationQueue() // we don't want to submit a real mentions payload, this is for testing only await publishNotification(NotificationType.MentionedInComment, { diff --git a/packages/server/modules/comments/domain/events.ts b/packages/server/modules/comments/domain/events.ts index fe2c99a4e..2ad0b65f5 100644 --- a/packages/server/modules/comments/domain/events.ts +++ b/packages/server/modules/comments/domain/events.ts @@ -11,6 +11,7 @@ export const commentEventsNamespace = 'comments' as const export const CommentEvents = { Created: `${commentEventsNamespace}.created`, Updated: `${commentEventsNamespace}.updated`, + Viewed: `${commentEventsNamespace}.viewed`, Archived: `${commentEventsNamespace}.archived` } as const @@ -25,6 +26,10 @@ export type CommentEventsPayloads = { previousComment: CommentRecord newComment: CommentRecord } + [CommentEvents.Viewed]: { + userId: string + commentId: string + } [CommentEvents.Archived]: { userId: string input: MutationCommentArchiveArgs diff --git a/packages/server/modules/comments/graph/resolvers/comments.ts b/packages/server/modules/comments/graph/resolvers/comments.ts index 6a0e8a4cd..4f8a1c03e 100644 --- a/packages/server/modules/comments/graph/resolvers/comments.ts +++ b/packages/server/modules/comments/graph/resolvers/comments.ts @@ -24,7 +24,7 @@ import { insertCommentLinksFactory, insertCommentsFactory, markCommentUpdatedFactory, - markCommentViewedFactory, + markCommentViewedFactory as markCommentViewedFactoryDb, resolvePaginatedProjectCommentsLatestModelResourcesFactory, updateCommentFactory } from '@/modules/comments/repositories/comments' @@ -56,7 +56,8 @@ import { createCommentThreadAndNotifyFactory, createCommentReplyAndNotifyFactory, editCommentAndNotifyFactory, - archiveCommentAndNotifyFactory + archiveCommentAndNotifyFactory, + markCommentViewedFactory } from '@/modules/comments/services/management' import { isLegacyData, @@ -533,8 +534,10 @@ export default { throwIfAuthNotOk(canReadProject) const projectDb = await getProjectDbClient({ projectId: args.input.projectId }) - const markCommentViewed = markCommentViewedFactory({ db: projectDb }) - await markCommentViewed(args.input.commentId, ctx.userId!) + await markCommentViewedFactory({ + markCommentViewed: markCommentViewedFactoryDb({ db: projectDb }), + emitEvent: getEventBus().emit + })(args.input.commentId, ctx.userId!) return true }, @@ -563,7 +566,7 @@ export default { }) const insertComments = insertCommentsFactory({ db: projectDb }) const insertCommentLinks = insertCommentLinksFactory({ db: projectDb }) - const markCommentViewed = markCommentViewedFactory({ db: projectDb }) + const markCommentViewed = markCommentViewedFactoryDb({ db: projectDb }) const createCommentThreadAndNotify = createCommentThreadAndNotifyFactory({ getViewerResourceItemsUngrouped, @@ -804,7 +807,7 @@ export default { insertComments: insertCommentsFactory({ db: projectDb }), insertCommentLinks: insertCommentLinksFactory({ db: projectDb }), deleteComment: deleteCommentFactory({ db: projectDb }), - markCommentViewed: markCommentViewedFactory({ db: projectDb }), + markCommentViewed: markCommentViewedFactoryDb({ db: projectDb }), emitEvent: getEventBus().emit, getViewerResourcesFromLegacyIdentifiers }) @@ -866,7 +869,7 @@ export default { throwIfAuthNotOk(canReadProject) const projectDb = await getProjectDbClient({ projectId: args.streamId }) - const markCommentViewed = markCommentViewedFactory({ db: projectDb }) + const markCommentViewed = markCommentViewedFactoryDb({ db: projectDb }) await markCommentViewed(args.commentId, context.userId!) return true diff --git a/packages/server/modules/comments/index.ts b/packages/server/modules/comments/index.ts index 7e7575a50..46faf8d34 100644 --- a/packages/server/modules/comments/index.ts +++ b/packages/server/modules/comments/index.ts @@ -11,7 +11,7 @@ import { getViewerResourcesForCommentsFactory, getViewerResourcesFromLegacyIdentifiersFactory } from '@/modules/core/services/commit/viewerResources' -import { publishNotification } from '@/modules/notifications/services/publication' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' import type { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { getEventBus } from '@/modules/shared/services/eventBus' import { publish } from '@/modules/shared/utils/subscriptions' diff --git a/packages/server/modules/comments/services/management.ts b/packages/server/modules/comments/services/management.ts index ec11f7b3b..9bb51a8ab 100644 --- a/packages/server/modules/comments/services/management.ts +++ b/packages/server/modules/comments/services/management.ts @@ -251,3 +251,21 @@ export const archiveCommentAndNotifyFactory = return updatedComment } + +export const markCommentViewedFactory = + (deps: { + markCommentViewed: MarkCommentViewed + emitEvent: EventBusEmit + }): MarkCommentViewed => + async (commentId: string, userId: string) => { + const updated = await deps.markCommentViewed(commentId, userId) + await deps.emitEvent({ + eventName: CommentEvents.Viewed, + payload: { + commentId, + userId + } + }) + + return updated + } diff --git a/packages/server/modules/comments/services/notifications.ts b/packages/server/modules/comments/services/notifications.ts index 6499cae90..090e56eec 100644 --- a/packages/server/modules/comments/services/notifications.ts +++ b/packages/server/modules/comments/services/notifications.ts @@ -1,10 +1,8 @@ import type { CommentRecord } from '@/modules/comments/helpers/types' import { ensureCommentSchema } from '@/modules/comments/services/commentTextService' -import type { JSONContent } from '@tiptap/core' -import { iterateContentNodes } from '@/modules/core/services/richTextEditorService' -import { difference, flatten } from 'lodash-es' +import { flatten } from 'lodash-es' import type { NotificationPublisher } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import type { AddStreamCommentMentionActivity, SaveStreamActivity @@ -15,30 +13,7 @@ import { StreamActionTypes, StreamResourceTypes } from '@/modules/activitystream/helpers/types' - -function findMentionedUserIds(doc: JSONContent) { - const mentionedUserIds = new Set() - - for (const node of iterateContentNodes(doc)) { - if (node.type === 'mention') { - const uid = node.attrs?.id - if (uid) { - mentionedUserIds.add(uid) - } - } - } - - return [...mentionedUserIds] -} - -function collectMentionedUserIds(comment: CommentRecord): string[] { - if (!comment.text) return [] - - const { doc } = ensureCommentSchema(comment.text) - if (!doc) return [] - - return findMentionedUserIds(doc) -} +import { processCommentMentions } from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment' /** * Save "user mentioned in stream comment" activity item @@ -105,20 +80,6 @@ const sendNotificationsForUsersFactory = ) } -const processCommentMentionsFactory = - (deps: SendNotificationsForUsersDeps) => - async (newComment: CommentRecord, previousComment?: CommentRecord) => { - const newMentionedUserIds = collectMentionedUserIds(newComment) - const previouslyMentionedUserIds = previousComment - ? collectMentionedUserIds(previousComment) - : [] - - const newMentions = difference(newMentionedUserIds, previouslyMentionedUserIds) - if (!newMentions.length) return - - await sendNotificationsForUsersFactory(deps)(newMentions, newComment) - } - /** * Hook into the comments lifecycle to generate notifications accordingly * @returns Callback to invoke when you wish to stop listening for comments events @@ -131,19 +92,22 @@ export const notifyUsersOnCommentEventsFactory = }) => async () => { const addStreamCommentMentionActivity = addStreamCommentMentionActivityFactory(deps) - const processCommentMentions = processCommentMentionsFactory({ + const sendNotificationsForUsers = sendNotificationsForUsersFactory({ ...deps, addStreamCommentMentionActivity }) const exitCbs = [ deps.eventBus.listen(CommentEvents.Created, async ({ payload: { comment } }) => { - await processCommentMentions(comment) + const newMentions = processCommentMentions(comment) + if (newMentions.length) await sendNotificationsForUsers(newMentions, comment) }), deps.eventBus.listen( CommentEvents.Updated, async ({ payload: { newComment, previousComment } }) => { - await processCommentMentions(newComment, previousComment) + const newMentions = processCommentMentions(newComment, previousComment) + if (newMentions.length) + await sendNotificationsForUsers(newMentions, newComment) } ) ] diff --git a/packages/server/modules/comments/tests/comments.spec.ts b/packages/server/modules/comments/tests/comments.spec.ts index dd288e179..2a57ac500 100644 --- a/packages/server/modules/comments/tests/comments.spec.ts +++ b/packages/server/modules/comments/tests/comments.spec.ts @@ -32,7 +32,7 @@ import { buildNotificationsStateTracker, purgeNotifications } from '@/test/notificationsHelper' -import { NotificationType } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' import type { ServerAndContext } from '@/test/graphqlHelper' import { createAuthedTestContext } from '@/test/graphqlHelper' import { @@ -98,6 +98,7 @@ import { import type { GetCommentsQueryVariables } from '@/modules/core/graph/generated/graphql' import type { BasicTestStream } from '@/test/speckle-helpers/streamHelper' import { createTestStream } from '@/test/speckle-helpers/streamHelper' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' const getStream = getStreamFactory({ db }) const streamResourceCheck = streamResourceCheckFactory({ @@ -1818,28 +1819,32 @@ describe('Comments @comments', () => { ...input }) - it('a valid mention triggers a notification', async () => { - const { getSends } = emailListener.listen({ times: 2 }) + !isNotificationListenerEnabled() + ? it('a valid mention triggers a notification', async () => { + const { getSends } = emailListener.listen({ times: 2 }) - const waitForAck = notificationsState.waitForAck( - (e) => e.result?.type === NotificationType.MentionedInComment - ) + const waitForAck = notificationsState.waitForAck( + (e) => e.result?.type === NotificationType.MentionedInComment + ) - const { data, errors } = await createOrReplyCommentWithMention(otherUser.id) - const result = getResult(data) + const { data, errors } = await createOrReplyCommentWithMention( + otherUser.id + ) + const result = getResult(data) - expect(errors).to.be.not.ok - expect(result).to.be.ok + expect(errors).to.be.not.ok + expect(result).to.be.ok - // Wait for - await waitForAck + // Wait for + await waitForAck - const emailSends = getSends() - const emailParams = emailSends[0] - expect(emailParams).to.be.ok - expect(emailParams.subject).to.contain('mentioned in a Speckle comment') - expect(emailParams.to).to.eq(otherUser.email) - }) + const emailSends = getSends() + const emailParams = emailSends[0] + expect(emailParams).to.be.ok + expect(emailParams.subject).to.contain('mentioned in a Speckle comment') + expect(emailParams.to).to.eq(otherUser.email) + }) + : {} }) }) }) diff --git a/packages/server/modules/core/dbSchema.ts b/packages/server/modules/core/dbSchema.ts index 186ae84f7..4248c187f 100644 --- a/packages/server/modules/core/dbSchema.ts +++ b/packages/server/modules/core/dbSchema.ts @@ -478,6 +478,18 @@ export const StreamActivity = buildTableHelper('stream_activity', [ 'message' ]) +export const UserNotifications = buildTableHelper('user_notifications', [ + 'id', + 'userId', + 'type', + 'read', + 'version', + 'payload', + 'sendEmailAt', + 'createdAt', + 'updatedAt' +]) + export const UserNotificationPreferences = buildTableHelper( 'user_notification_preferences', ['userId', 'preferences'] diff --git a/packages/server/modules/core/graph/generated/graphql.ts b/packages/server/modules/core/graph/generated/graphql.ts index 592341009..55a38bbba 100644 --- a/packages/server/modules/core/graph/generated/graphql.ts +++ b/packages/server/modules/core/graph/generated/graphql.ts @@ -2091,6 +2091,7 @@ export type Mutation = { */ inviteResend: Scalars['Boolean']['output']; modelMutations: ModelMutations; + notificationMutations: NotificationMutations; /** @deprecated Part of the old API surface and will be removed in the future. */ objectCreate: Array; projectMutations: ProjectMutations; @@ -2484,6 +2485,39 @@ export type MutationWebhookUpdateArgs = { webhook: WebhookUpdateInput; }; +export type Notification = { + __typename?: 'Notification'; + createdAt: Scalars['DateTime']['output']; + id: Scalars['ID']['output']; + payload: Scalars['JSONObject']['output']; + read: Scalars['Boolean']['output']; + type: Scalars['String']['output']; + updatedAt: Scalars['DateTime']['output']; +}; + +export type NotificationMutations = { + __typename?: 'NotificationMutations'; + /** Delete an existing notification */ + bulkDelete: Scalars['Boolean']['output']; + /** update notidication */ + bulkUpdate: Scalars['Boolean']['output']; +}; + + +export type NotificationMutationsBulkDeleteArgs = { + ids: Array; +}; + + +export type NotificationMutationsBulkUpdateArgs = { + input: Array; +}; + +export type NotificationUpdateInput = { + id: Scalars['ID']['input']; + read: Scalars['Boolean']['input']; +}; + export type Object = { __typename?: 'Object'; /** @deprecated Not implemented. */ @@ -4971,6 +5005,8 @@ export type User = { meta: UserMeta; name: Scalars['String']['output']; notificationPreferences: Scalars['JSONObject']['output']; + /** List all notifications for the user */ + notifications: UserNotificationCollection; permissions: RootPermissionChecks; profiles?: Maybe; /** Get pending project access request, that the user made */ @@ -5056,6 +5092,16 @@ export type UserFavoriteStreamsArgs = { }; +/** + * Full user type, should only be used in the context of admin operations or + * when a user is reading/writing info about himself + */ +export type UserNotificationsArgs = { + cursor?: InputMaybe; + limit?: InputMaybe; +}; + + /** * Full user type, should only be used in the context of admin operations or * when a user is reading/writing info about himself @@ -5246,6 +5292,13 @@ export type UserMetaMutationsSetSpeckleConBannerDismissedArgs = { value: Scalars['Boolean']['input']; }; +export type UserNotificationCollection = { + __typename?: 'UserNotificationCollection'; + cursor?: Maybe; + items: Array; + totalCount: Scalars['Int']['output']; +}; + export type UserProjectCollection = { __typename?: 'UserProjectCollection'; cursor?: Maybe; @@ -6566,6 +6619,9 @@ export type ResolversTypes = { ModelsTreeItemCollection: ResolverTypeWrapper & { items: Array }>; MoveVersionsInput: MoveVersionsInput; Mutation: ResolverTypeWrapper<{}>; + Notification: ResolverTypeWrapper; + NotificationMutations: ResolverTypeWrapper; + NotificationUpdateInput: NotificationUpdateInput; Object: ResolverTypeWrapper; ObjectCollection: ResolverTypeWrapper & { objects: Array }>; ObjectCreateInput: ObjectCreateInput; @@ -6708,6 +6764,7 @@ export type ResolversTypes = { UserGendoAICredits: ResolverTypeWrapper; UserMeta: ResolverTypeWrapper; UserMetaMutations: ResolverTypeWrapper; + UserNotificationCollection: ResolverTypeWrapper; UserProjectCollection: ResolverTypeWrapper & { items: Array }>; UserProjectsFilter: UserProjectsFilter; UserProjectsUpdatedMessage: ResolverTypeWrapper & { project?: Maybe }>; @@ -6979,6 +7036,9 @@ export type ResolversParentTypes = { ModelsTreeItemCollection: Omit & { items: Array }; MoveVersionsInput: MoveVersionsInput; Mutation: {}; + Notification: Notification; + NotificationMutations: MutationsObjectGraphQLReturn; + NotificationUpdateInput: NotificationUpdateInput; Object: ObjectGraphQLReturn; ObjectCollection: Omit & { objects: Array }; ObjectCreateInput: ObjectCreateInput; @@ -7101,6 +7161,7 @@ export type ResolversParentTypes = { UserGendoAICredits: UserGendoAiCredits; UserMeta: UserMetaGraphQLReturn; UserMetaMutations: MutationsObjectGraphQLReturn; + UserNotificationCollection: UserNotificationCollection; UserProjectCollection: Omit & { items: Array }; UserProjectsFilter: UserProjectsFilter; UserProjectsUpdatedMessage: Omit & { project?: Maybe }; @@ -8123,6 +8184,7 @@ export type MutationResolvers>; inviteResend?: Resolver>; modelMutations?: Resolver; + notificationMutations?: Resolver; objectCreate?: Resolver, ParentType, ContextType, RequireFields>; projectMutations?: Resolver; requestVerification?: Resolver; @@ -8159,6 +8221,22 @@ export type MutationResolvers; }; +export type NotificationResolvers = { + createdAt?: Resolver; + id?: Resolver; + payload?: Resolver; + read?: Resolver; + type?: Resolver; + updatedAt?: Resolver; + __isTypeOf?: IsTypeOfResolverFn; +}; + +export type NotificationMutationsResolvers = { + bulkDelete?: Resolver>; + bulkUpdate?: Resolver>; + __isTypeOf?: IsTypeOfResolverFn; +}; + export type ObjectResolvers = { applicationId?: Resolver, ParentType, ContextType>; children?: Resolver>; @@ -8955,6 +9033,7 @@ export type UserResolvers; name?: Resolver; notificationPreferences?: Resolver; + notifications?: Resolver>; permissions?: Resolver; profiles?: Resolver, ParentType, ContextType>; projectAccessRequest?: Resolver, ParentType, ContextType, RequireFields>; @@ -9023,6 +9102,13 @@ export type UserMetaMutationsResolvers; }; +export type UserNotificationCollectionResolvers = { + cursor?: Resolver, ParentType, ContextType>; + items?: Resolver, ParentType, ContextType>; + totalCount?: Resolver; + __isTypeOf?: IsTypeOfResolverFn; +}; + export type UserProjectCollectionResolvers = { cursor?: Resolver, ParentType, ContextType>; items?: Resolver, ParentType, ContextType>; @@ -9541,6 +9627,8 @@ export type Resolvers = { ModelsTreeItem?: ModelsTreeItemResolvers; ModelsTreeItemCollection?: ModelsTreeItemCollectionResolvers; Mutation?: MutationResolvers; + Notification?: NotificationResolvers; + NotificationMutations?: NotificationMutationsResolvers; Object?: ObjectResolvers; ObjectCollection?: ObjectCollectionResolvers; PasswordStrengthCheckFeedback?: PasswordStrengthCheckFeedbackResolvers; @@ -9621,6 +9709,7 @@ export type Resolvers = { UserGendoAICredits?: UserGendoAiCreditsResolvers; UserMeta?: UserMetaResolvers; UserMetaMutations?: UserMetaMutationsResolvers; + UserNotificationCollection?: UserNotificationCollectionResolvers; UserProjectCollection?: UserProjectCollectionResolvers; UserProjectsUpdatedMessage?: UserProjectsUpdatedMessageResolvers; UserSearchResultCollection?: UserSearchResultCollectionResolvers; @@ -10637,6 +10726,28 @@ export type GetRegionalProjectBlobQueryVariables = Exact<{ export type GetRegionalProjectBlobQuery = { __typename?: 'Query', project: { __typename?: 'Project', id: string, blob?: { __typename?: 'BlobMetadata', id: string, fileName: string } | null } }; +export type GetUserNotificationsQueryVariables = Exact<{ + limit?: InputMaybe; + cursor?: InputMaybe; +}>; + + +export type GetUserNotificationsQuery = { __typename?: 'Query', activeUser?: { __typename?: 'User', notifications: { __typename?: 'UserNotificationCollection', cursor?: string | null, totalCount: number, items: Array<{ __typename?: 'Notification', id: string, type: string, createdAt: Date, payload: Record, read: boolean, updatedAt: Date }> } } | null }; + +export type UserBulkDeleteNotidicationMutationVariables = Exact<{ + ids: Array | Scalars['String']['input']; +}>; + + +export type UserBulkDeleteNotidicationMutation = { __typename?: 'Mutation', notificationMutations: { __typename?: 'NotificationMutations', bulkDelete: boolean } }; + +export type UserBulkUpdateNotificationsMutationVariables = Exact<{ + input: Array | NotificationUpdateInput; +}>; + + +export type UserBulkUpdateNotificationsMutation = { __typename?: 'Mutation', notificationMutations: { __typename?: 'NotificationMutations', bulkUpdate: boolean } }; + export type BasicProjectAccessRequestFieldsFragment = { __typename?: 'ProjectAccessRequest', id: string, requesterId: string, projectId: string, createdAt: Date, requester: { __typename?: 'LimitedUser', id: string, name: string } }; export type CreateProjectAccessRequestMutationVariables = Exact<{ @@ -11327,6 +11438,9 @@ export const GetRegionalProjectAutomationDocument = {"kind":"Document","definiti export const GetRegionalProjectCommentDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectComment"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"commentId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"comment"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"commentId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]}}]} as unknown as DocumentNode; export const GetRegionalProjectWebhookDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectWebhook"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"webhookId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"webhooks"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"webhookId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"items"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]}}]}}]} as unknown as DocumentNode; export const GetRegionalProjectBlobDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetRegionalProjectBlob"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"blobId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"project"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"blob"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"id"},"value":{"kind":"Variable","name":{"kind":"Name","value":"blobId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"fileName"}}]}}]}}]}}]} as unknown as DocumentNode; +export const GetUserNotificationsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetUserNotifications"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"limit"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"Int"}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"cursor"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notifications"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"limit"},"value":{"kind":"Variable","name":{"kind":"Name","value":"limit"}}},{"kind":"Argument","name":{"kind":"Name","value":"cursor"},"value":{"kind":"Variable","name":{"kind":"Name","value":"cursor"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"items"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"type"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"payload"}},{"kind":"Field","name":{"kind":"Name","value":"read"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}},{"kind":"Field","name":{"kind":"Name","value":"cursor"}},{"kind":"Field","name":{"kind":"Name","value":"totalCount"}}]}}]}}]}}]} as unknown as DocumentNode; +export const UserBulkDeleteNotidicationDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"UserBulkDeleteNotidication"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"ids"}},"type":{"kind":"NonNullType","type":{"kind":"ListType","type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notificationMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"bulkDelete"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"ids"},"value":{"kind":"Variable","name":{"kind":"Name","value":"ids"}}}]}]}}]}}]} as unknown as DocumentNode; +export const UserBulkUpdateNotificationsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"UserBulkUpdateNotifications"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"ListType","type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"NotificationUpdateInput"}}}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"notificationMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"bulkUpdate"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}]}]}}]}}]} as unknown as DocumentNode; export const CreateProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"CreateProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"accessRequestMutations"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"create"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}}]}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode; export const GetActiveUserProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetActiveUserProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectAccessRequest"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode; export const GetActiveUserFullProjectAccessRequestDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetActiveUserFullProjectAccessRequest"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"String"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"activeUser"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"projectAccessRequest"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"projectId"},"value":{"kind":"Variable","name":{"kind":"Name","value":"projectId"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"}},{"kind":"Field","name":{"kind":"Name","value":"project"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}}]}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"BasicProjectAccessRequestFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ProjectAccessRequest"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"requester"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}}]}},{"kind":"Field","name":{"kind":"Name","value":"requesterId"}},{"kind":"Field","name":{"kind":"Name","value":"projectId"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}}]}}]} as unknown as DocumentNode; diff --git a/packages/server/modules/core/scopes.ts b/packages/server/modules/core/scopes.ts index 1339a9a87..bd596a926 100644 --- a/packages/server/modules/core/scopes.ts +++ b/packages/server/modules/core/scopes.ts @@ -33,6 +33,16 @@ export default [ description: "Read other users' profiles.", public: true }, + { + name: Scopes.Notifications.Read, + description: 'Required to see your notifications.', + public: true + }, + { + name: Scopes.Notifications.Write, + description: 'Required to update your notifications.', + public: true + }, { name: Scopes.Server.Stats, description: diff --git a/packages/server/modules/notifications/domain/operations.ts b/packages/server/modules/notifications/domain/operations.ts index a0dec84a6..8affeb451 100644 --- a/packages/server/modules/notifications/domain/operations.ts +++ b/packages/server/modules/notifications/domain/operations.ts @@ -1,4 +1,12 @@ -import type { NotificationPreferences } from '@/modules/notifications/helpers/types' +import type { + BaseUserNotification, + NotificationChannel, + NotificationPreferences, + UserNotificationRecord +} from '@/modules/notifications/helpers/types' +import type { MaybeNullOrUndefined } from '@speckle/shared' +import type { NotificationType } from '@speckle/shared/notifications' +import type { Exact } from 'type-fest' export type GetSavedUserNotificationPreferences = ( userId: string @@ -12,3 +20,43 @@ export type SaveUserNotificationPreferences = ( export type GetUserNotificationPreferences = ( userId: string ) => Promise + +export type GetUserPreferenceForNotificationType = ( + userId: string, + notificationType: NotificationType, + notificationChannel: NotificationChannel +) => Promise + +export type GetUserNotifications = (args: { + userId: string + cursor: string | null + limit: number | null +}) => Promise<{ items: BaseUserNotification[]; cursor: string | null }> + +export type GetUserNotificationsCount = (args: { userId: string }) => Promise + +export type GetNextEmailNotification = () => Promise< + MaybeNullOrUndefined +> + +export type MarkCommentNotificationAsRead = (args: { + userId: string + commentId: string +}) => Promise + +export type DeleteUserNotifications = (args: { + userId: string + ids: string[] +}) => Promise + +export type StoreUserNotifications = < + Notification extends Exact +>( + notifications: Notification[] +) => Promise + +export type UpdateUserNotification = (args: { + id: string + userId: string + update: Partial> +}) => Promise diff --git a/packages/server/modules/notifications/events/notificationListener.ts b/packages/server/modules/notifications/events/notificationListener.ts new file mode 100644 index 000000000..8073d219c --- /dev/null +++ b/packages/server/modules/notifications/events/notificationListener.ts @@ -0,0 +1,55 @@ +import { CommentEvents } from '@/modules/comments/domain/events' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' +import type { EventBus, EventPayload } from '@/modules/shared/services/eventBus' +import { publishEventMessage } from '@/modules/notifications/services/events/queue' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' +import type { Logger } from '@/observability/logging' +import { markCommentNotificationsAsReadFactory } from '@/modules/notifications/repositories/userNotification' +import type { MarkCommentNotificationAsRead } from '@/modules/notifications/domain/operations' +import { db } from '@/db/knex' + +export type NotificationEvents = EventPayload< + | typeof CommentEvents.Created + | typeof CommentEvents.Updated + | typeof AccessRequestEvents.Created + | typeof AccessRequestEvents.Finalized +> + +const onEventTriggersNotificationFactory = + ({ logger }: { logger: Logger }) => + async (event: NotificationEvents) => { + if (!isNotificationListenerEnabled()) return + + logger.debug('Notification triggered for event', event) + + await publishEventMessage(event) + } + +const onCommentViewedFactory = + (deps: { markCommentNotificationsAsRead: MarkCommentNotificationAsRead }) => + async (event: EventPayload) => { + if (!isNotificationListenerEnabled()) return + + await deps.markCommentNotificationsAsRead(event.payload) + } + +export const notificationListenersFactory = + (deps: { eventBus: EventBus; logger: Logger }) => () => { + const onEventTriggersNotification = onEventTriggersNotificationFactory({ + logger: deps.logger + }) + + const onCommentViewed = onCommentViewedFactory({ + markCommentNotificationsAsRead: markCommentNotificationsAsReadFactory({ db }) + }) + + const cbs = [ + deps.eventBus.listen(CommentEvents.Created, onEventTriggersNotification), + deps.eventBus.listen(CommentEvents.Updated, onEventTriggersNotification), + deps.eventBus.listen(AccessRequestEvents.Created, onEventTriggersNotification), + deps.eventBus.listen(AccessRequestEvents.Finalized, onEventTriggersNotification), + deps.eventBus.listen(CommentEvents.Viewed, onCommentViewed) + ] + + return () => cbs.forEach((cb) => cb()) + } diff --git a/packages/server/modules/notifications/graph/resolvers/userNotificationPreferences.ts b/packages/server/modules/notifications/graph/resolvers/userNotificationPreferences.ts index 6a56f3aa4..5a6764073 100644 --- a/packages/server/modules/notifications/graph/resolvers/userNotificationPreferences.ts +++ b/packages/server/modules/notifications/graph/resolvers/userNotificationPreferences.ts @@ -3,7 +3,7 @@ import type { Resolvers } from '@/modules/core/graph/generated/graphql' import { getSavedUserNotificationPreferencesFactory, saveUserNotificationPreferencesFactory -} from '@/modules/notifications/repositories' +} from '@/modules/notifications/repositories/userNotificationPreferences' import { getUserNotificationPreferencesFactory, updateNotificationPreferencesFactory @@ -30,7 +30,7 @@ export default { Mutation: { async userNotificationPreferencesUpdate(_parent, args, context) { const logger = context.log - await await withOperationLogging( + await withOperationLogging( async () => updateNotificationPreferences(context.userId!, args.preferences), { logger, diff --git a/packages/server/modules/notifications/graph/resolvers/userNotifications.ts b/packages/server/modules/notifications/graph/resolvers/userNotifications.ts new file mode 100644 index 000000000..ff11238a7 --- /dev/null +++ b/packages/server/modules/notifications/graph/resolvers/userNotifications.ts @@ -0,0 +1,98 @@ +import { db } from '@/db/knex' +import type { Resolvers } from '@/modules/core/graph/generated/graphql' +import { parseNotificationToLatestVersion } from '@/modules/notifications/helpers/toLatestVersion' +import { + deleteUserNotificationsFactory, + getUserNotificationsCountFactory, + getUserNotificationsFactory, + updateUserNotificationFactory +} from '@/modules/notifications/repositories/userNotification' +import { asOperation } from '@/modules/shared/command' +import { chunk } from 'lodash-es' + +const getUserNotifications = getUserNotificationsFactory({ db }) +const deleteUserNotifications = deleteUserNotificationsFactory({ db }) +const updateUserNotification = updateUserNotificationFactory({ db }) +const getUserNotificationsCount = getUserNotificationsCountFactory({ db }) + +const resolvers: Resolvers = { + User: { + async notifications(parent, args) { + const [totalCount, { items, cursor }] = await Promise.all([ + await getUserNotificationsCount({ userId: parent.id }), + await getUserNotifications({ + userId: parent.id, + cursor: args.cursor || null, + limit: args.limit || null + }) + ]) + + return { + totalCount, + cursor, + items: items.map(parseNotificationToLatestVersion) + } + } + }, + Mutation: { + notificationMutations: () => ({}) + }, + NotificationMutations: { + async bulkDelete(_parent, { ids }, context) { + await asOperation( + async () => { + await deleteUserNotifications({ + userId: context.userId!, + ids + }) + }, + { + logger: context.log, + name: 'userNotificationPreferencesUpdate', + description: 'deleting user notifications' + } + ) + + return true + }, + async bulkUpdate(_parent, args, context) { + await asOperation( + async () => { + const inputBatches = chunk(args.input, 10) + for (const batch of inputBatches) { + await Promise.all( + batch.map(({ id, read }) => { + let update = {} + if (read === false) { + update = { + read: false, + sendEmailAt: null + } + } else { + update = { + read: true + } + } + + return updateUserNotification({ + userId: context.userId!, + id, + update + }) + }) + ) + } + }, + { + logger: context.log, + name: 'userNotificationPreferencesUpdate', + description: 'marking user notifications as read' + } + ) + + return true + } + } +} + +export default resolvers diff --git a/packages/server/modules/notifications/helpers/toLatestVersion.ts b/packages/server/modules/notifications/helpers/toLatestVersion.ts new file mode 100644 index 000000000..e7fa5ccba --- /dev/null +++ b/packages/server/modules/notifications/helpers/toLatestVersion.ts @@ -0,0 +1,45 @@ +import type { + BaseUserNotification, + UserNotificationRecord +} from '@/modules/notifications/helpers/types' +import { LatestNotificationVersions } from '@/modules/notifications/helpers/types' +import { logger } from '@/observability/logging' +import type { Nullable } from '@speckle/shared' + +export const ensureNotificationToLatestVersion = ( + notification: BaseUserNotification +): Nullable => { + const latestVersion = LatestNotificationVersions[notification.type] + + if (notification.version === latestVersion) { + return notification as UserNotificationRecord + } + + logger.error( + { + notification, + latestVersion + }, + 'No notification backward compatibility was configured' + ) + return null +} + +export const parseNotificationToLatestVersion = ( + notification: BaseUserNotification +): UserNotificationRecord => { + const latestVersion = LatestNotificationVersions[notification.type] + + if (notification.version === latestVersion) { + return notification as UserNotificationRecord + } + + logger.warn( + { + notification, + latestVersion + }, + 'No notification backward compatibility was configured' + ) + return notification as UserNotificationRecord +} diff --git a/packages/server/modules/notifications/helpers/types.ts b/packages/server/modules/notifications/helpers/types.ts index 3e5c7926f..6a566886e 100644 --- a/packages/server/modules/notifications/helpers/types.ts +++ b/packages/server/modules/notifications/helpers/types.ts @@ -4,13 +4,8 @@ import type { MaybeAsync, Optional } from '@/modules/shared/helpers/typeHelper' import type { Job } from 'bull' import { isObject, has } from 'lodash-es' import type { Logger } from 'pino' - -export enum NotificationType { - ActivityDigest = 'activityDigest', - MentionedInComment = 'mentionedInComment', - NewStreamAccessRequest = 'newStreamAccessRequest', - StreamAccessRequestApproved = 'streamAccessRequestApproved' -} +import type { NotificationPayloadMap } from '@speckle/shared/notifications' +import { NotificationType } from '@speckle/shared/notifications' export enum NotificationChannel { Email = 'email' @@ -25,6 +20,37 @@ export type UserNotificationPreferencesRecord = { preferences: NotificationPreferences } +export type BaseUserNotification = { + id: string + userId: string + type: NotificationType + read: boolean + version: string + payload: object + sendEmailAt: Date | null + createdAt: Date + updatedAt: Date +} + +export type UserNotificationRecord = { + [K in keyof NotificationPayloadMap]: Omit< + BaseUserNotification, + 'payload' | 'version' | 'type' + > & { + type: K + version: (typeof LatestNotificationVersions)[K] + payload: NotificationPayloadMap[K] + } +}[keyof NotificationPayloadMap] + +const DEFAULT_VERSION = '1' as const +export const LatestNotificationVersions = { + [NotificationType.MentionedInComment]: DEFAULT_VERSION, + [NotificationType.NewStreamAccessRequest]: DEFAULT_VERSION, + [NotificationType.StreamAccessRequestApproved]: DEFAULT_VERSION, + [NotificationType.ActivityDigest]: DEFAULT_VERSION +} + // Add mappings between NotificationTypes and expected Message types here export type NotificationTypeMessageMap = { [NotificationType.MentionedInComment]: MentionedInCommentMessage diff --git a/packages/server/modules/notifications/index.ts b/packages/server/modules/notifications/index.ts index 9723b409f..6fada2018 100644 --- a/packages/server/modules/notifications/index.ts +++ b/packages/server/modules/notifications/index.ts @@ -1,20 +1,31 @@ import { - initializeQueue, + initializePublicationQueue, consumeIncomingNotifications, registerNotificationHandlers, - shutdownQueue -} from '@/modules/notifications/services/queue' + shutdownPublicationQueue +} from '@/modules/notifications/services/publication/queue' import type { NotificationTypeHandlers } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' import type { SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { shouldDisableNotificationsConsumption } from '@/modules/shared/helpers/envHelper' -import { moduleLogger } from '@/observability/logging' -import MentionedInCommentHandler from '@/modules/notifications/services/handlers/mentionedInComment' -import NewStreamAccessRequestHandler from '@/modules/notifications/services/handlers/newStreamAccessRequest' -import StreamAccessRequestApprovedHandler from '@/modules/notifications/services/handlers/streamAccessRequestApproved' -import ActivityDigestHandler from '@/modules/notifications/services/handlers/activityDigest' +import { moduleLogger, notificationsLogger } from '@/observability/logging' +import MentionedInCommentHandler from '@/modules/notifications/services/publication/handlers/mentionedInComment' +import NewStreamAccessRequestHandler from '@/modules/notifications/services/publication/handlers/newStreamAccessRequest' +import StreamAccessRequestApprovedHandler from '@/modules/notifications/services/publication/handlers/streamAccessRequestApproved' +import ActivityDigestHandler from '@/modules/notifications/services/publication/handlers/activityDigest' +import { + initializeNotificationEventsConsumption, + initializeNotificationEventsQueue, + shutdownEventQueue +} from '@/modules/notifications/services/events/queue' +import { notificationListenersFactory } from '@/modules/notifications/events/notificationListener' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { scheduleDelayedEmailNotifications } from '@/modules/notifications/tasks/delayedNotifications' +import type cron from 'node-cron' +import { NotificationType } from '@speckle/shared/notifications' -export async function initializeConsumption( +let scheduledTasks: cron.ScheduledTask[] = [] + +export async function initializePublicationConsumption( customHandlers?: Partial ) { moduleLogger.info('📞 Initializing notification queue consumption...') @@ -28,7 +39,7 @@ export async function initializeConsumption( registerNotificationHandlers(customHandlers || allHandlers) - await initializeQueue() + await initializePublicationQueue() if (shouldDisableNotificationsConsumption()) { moduleLogger.info('Skipping notification consumption...') @@ -40,10 +51,24 @@ export async function initializeConsumption( export const init: SpeckleModule['init'] = async ({ isInitial }) => { moduleLogger.info('📞 Init notifications module') if (isInitial) { - await initializeConsumption() + await initializePublicationConsumption() + + await initializeNotificationEventsQueue() + await initializeNotificationEventsConsumption() + + notificationListenersFactory({ + eventBus: getEventBus(), + logger: notificationsLogger + })() + + scheduledTasks = [await scheduleDelayedEmailNotifications()] } } export const shutdown: SpeckleModule['shutdown'] = async () => { - await shutdownQueue() + await shutdownPublicationQueue() + await shutdownEventQueue() + scheduledTasks.forEach((task) => { + task.stop() + }) } diff --git a/packages/server/modules/notifications/migrations/20250915154600_user_notifications.ts b/packages/server/modules/notifications/migrations/20250915154600_user_notifications.ts new file mode 100644 index 000000000..3d6eb324b --- /dev/null +++ b/packages/server/modules/notifications/migrations/20250915154600_user_notifications.ts @@ -0,0 +1,28 @@ +import type { Knex } from 'knex' + +const TABLE_NAME = 'user_notifications' + +export async function up(knex: Knex): Promise { + await knex.schema.createTable(TABLE_NAME, (table) => { + table.string('id').primary() + table.string('type').notNullable() + table.boolean('read').notNullable() + table + .string('userId', 10) + .primary() + .references('id') + .inTable('users') + .onDelete('cascade') + table.jsonb('payload').notNullable() + table.string('version').notNullable() + table.timestamp('sendEmailAt', { precision: 3, useTz: true }).nullable() + table.timestamp('createdAt', { precision: 3, useTz: true }).defaultTo(knex.fn.now()) + table.timestamp('updatedAt', { precision: 3, useTz: true }).defaultTo(knex.fn.now()) + + table.index(['userId', 'createdAt']) + }) +} + +export async function down(knex: Knex): Promise { + await knex.schema.dropTableIfExists(TABLE_NAME) +} diff --git a/packages/server/modules/notifications/repositories/userNotification.ts b/packages/server/modules/notifications/repositories/userNotification.ts new file mode 100644 index 000000000..ac7386831 --- /dev/null +++ b/packages/server/modules/notifications/repositories/userNotification.ts @@ -0,0 +1,126 @@ +import { UserNotifications } from '@/modules/core/dbSchema' +import type { + DeleteUserNotifications, + GetNextEmailNotification, + GetUserNotifications, + GetUserNotificationsCount, + MarkCommentNotificationAsRead, + StoreUserNotifications, + UpdateUserNotification +} from '@/modules/notifications/domain/operations' +import { type UserNotificationRecord } from '@/modules/notifications/helpers/types' +import { compositeCursorTools } from '@/modules/shared/helpers/dbHelper' +import { isNullOrUndefined } from '@speckle/shared' +import { NotificationType } from '@speckle/shared/notifications' +import { type Knex } from 'knex' +import { clamp, pick } from 'lodash-es' + +const tables = { + userNotifications: (db: Knex) => db(UserNotifications.name) +} + +const getCursorTools = () => + compositeCursorTools({ + schema: UserNotifications, + cols: ['createdAt', 'id'] + }) + +export const getUserNotificationsFactory = + (deps: { db: Knex }): GetUserNotifications => + async (args) => { + const limit = clamp(isNullOrUndefined(args.limit) ? 10 : args.limit, 0, 50) + const { applyCursorSortAndFilter, resolveNewCursor } = getCursorTools() + + const q = tables + .userNotifications(deps.db) + .where({ userId: args.userId }) + .limit(limit) + + applyCursorSortAndFilter({ + query: q, + cursor: args.cursor + }) + + const items = await q + const newCursor = resolveNewCursor(items) + + return { + items, + cursor: newCursor + } + } + +export const getUserNotificationsCountFactory = + (deps: { db: Knex }): GetUserNotificationsCount => + async ({ userId }) => { + const [res] = await tables.userNotifications(deps.db).where({ userId }).count() + + return parseInt(res.count.toString()) + } + +export const storeUserNotificationsFactory = + (deps: { db: Knex }): StoreUserNotifications => + async (args) => { + const notifications = await deps.db(UserNotifications.name).insert(args) + + return notifications.length + } + +export const updateUserNotificationFactory = + (deps: { db: Knex }): UpdateUserNotification => + async ({ userId, id, update }) => { + const [notification] = await tables + .userNotifications(deps.db) + .where({ userId, id }) + .update( + pick( + update, + UserNotifications.withoutTablePrefix.cols + ) as Partial + ) + .returning('*') + + return notification + } + +/** + * This function retrieves the next email notification for a user. + * If used from different transactions, it will skip locked rows, and if there is a notification it locks it for the transaction context. + * This means that two processes querying for the next email notification (if both use a transaction to send the email etc) will not pick up the same notification. + */ +export const getNextEmailNotificationFactory = + (deps: { db: Knex }): GetNextEmailNotification => + async () => { + const notification = await tables + .userNotifications(deps.db) + .where(UserNotifications.col.sendEmailAt, '<=', new Date()) + .andWhere(UserNotifications.col.read, false) + .forUpdate() + .skipLocked() + .first() + + return notification + } + +export const deleteUserNotificationsFactory = + (deps: { db: Knex }): DeleteUserNotifications => + async ({ userId, ids }) => { + await tables + .userNotifications(deps.db) + .where({ userId }) + .whereIn(UserNotifications.col.id, ids) + .delete() + } + +export const markCommentNotificationsAsReadFactory = + (deps: { db: Knex }): MarkCommentNotificationAsRead => + async ({ userId, commentId }) => { + const rows = await deps + .db(UserNotifications.name) + .where({ userId }) + .andWhere(UserNotifications.col.type, NotificationType.MentionedInComment) + .whereJsonSupersetOf(UserNotifications.col.payload, { commentId }) + .update({ read: true }) + + return rows + } diff --git a/packages/server/modules/notifications/repositories.ts b/packages/server/modules/notifications/repositories/userNotificationPreferences.ts similarity index 100% rename from packages/server/modules/notifications/repositories.ts rename to packages/server/modules/notifications/repositories/userNotificationPreferences.ts diff --git a/packages/server/modules/notifications/services/events/handlers/createdOrUpdatedComment.ts b/packages/server/modules/notifications/services/events/handlers/createdOrUpdatedComment.ts new file mode 100644 index 000000000..f29f803ba --- /dev/null +++ b/packages/server/modules/notifications/services/events/handlers/createdOrUpdatedComment.ts @@ -0,0 +1,251 @@ +import { db } from '@/db/knex' +import type { GetComment } from '@/modules/comments/domain/operations' +import type { ExtendedComment } from '@/modules/comments/domain/types' +import type { CommentRecord } from '@/modules/comments/helpers/types' +import { getCommentFactory } from '@/modules/comments/repositories/comments' +import { ensureCommentSchema } from '@/modules/comments/services/commentTextService' +import type { GetServerInfo } from '@/modules/core/domain/server/operations' +import type { GetStream } from '@/modules/core/domain/streams/operations' +import type { StreamWithOptionalRole } from '@/modules/core/domain/streams/types' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { Roles } from '@/modules/core/helpers/mainConstants' +import type { ServerInfo } from '@/modules/core/helpers/types' +import { getServerInfoFactory } from '@/modules/core/repositories/server' +import { getStreamFactory } from '@/modules/core/repositories/streams' +import type { UserWithOptionalRole } from '@/modules/core/repositories/users' +import { getUserFactory } from '@/modules/core/repositories/users' +import { iterateContentNodes } from '@/modules/core/services/richTextEditorService' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import type { + GetUserPreferenceForNotificationType, + StoreUserNotifications +} from '@/modules/notifications/domain/operations' +import { NotificationValidationError } from '@/modules/notifications/errors' +import { NotificationChannel } from '@/modules/notifications/helpers/types' +import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' +import type { MaybeFalsy, Nullable } from '@/modules/shared/helpers/typeHelper' +import type { EventType } from '@/modules/shared/services/eventBus' +import type { JSONContent } from '@tiptap/core' +import cryptoRandomString from 'crypto-random-string' +import type { Knex } from 'knex' +import { difference } from 'lodash-es' +import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences' +import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences' +import { NotificationType } from '@speckle/shared/notifications' + +type ValidatedNotificationState = { + targetUser: UserWithOptionalRole + author: UserWithOptionalRole + stream: StreamWithOptionalRole + threadComment: ExtendedComment + mentionComment: ExtendedComment + commitOrObjectId: { commitId: Nullable; objectId: Nullable } + serverInfo: ServerInfo +} + +function findMentionedUserIds(doc: JSONContent) { + const mentionedUserIds = new Set() + + for (const node of iterateContentNodes(doc)) { + if (node.type === 'mention') { + const uid = node.attrs?.id + if (uid) { + mentionedUserIds.add(uid) + } + } + } + + return [...mentionedUserIds] +} + +function collectMentionedUserIds(comment: CommentRecord): string[] { + if (!comment.text) return [] + + const { doc } = ensureCommentSchema(comment.text) + if (!doc) return [] + + return findMentionedUserIds(doc) +} + +export const processCommentMentions = ( + newComment: CommentRecord, + previousComment?: CommentRecord +) => { + const newMentionedUserIds = collectMentionedUserIds(newComment) + const previouslyMentionedUserIds = previousComment + ? collectMentionedUserIds(previousComment) + : [] + + return difference(newMentionedUserIds, previouslyMentionedUserIds) +} + +export function validateCommentNotification(state: { + targetUser: MaybeFalsy + author: MaybeFalsy + stream: MaybeFalsy + threadComment: MaybeFalsy + mentionComment: MaybeFalsy + serverInfo: ServerInfo +}): ValidatedNotificationState { + const { targetUser, author, stream, threadComment, mentionComment, serverInfo } = + state + + if ( + !targetUser || + targetUser.role === Roles.Server.ArchivedUser || + !targetUser.email + ) { + throw new NotificationValidationError('Invalid mention target user') + } + + if (!author || author.role === Roles.Server.ArchivedUser) { + throw new NotificationValidationError('Invalid mention author user') + } + + if (!stream) { + throw new NotificationValidationError('Invalid mention stream') + } + + if (!threadComment || threadComment.streamId !== stream.id) { + throw new NotificationValidationError('Invalid mention thread comment') + } + + if (!mentionComment || mentionComment.streamId !== stream.id) { + throw new NotificationValidationError('Invalid mention comment') + } + + const commitOrObjectResource = threadComment.resources.find((r) => + ['commit', 'object'].includes(r.resourceType) + ) + if (!commitOrObjectResource) { + // This will only happen if threadComment is actually a reply, so if the notification + // was emitted with wrong parameters + throw new NotificationValidationError( + "Couldn't resolve the comment's associated resource - the comment might be a reply" + ) + } + + const commitId = + commitOrObjectResource.resourceType === 'commit' + ? commitOrObjectResource.resourceId + : null + const objectId = + commitOrObjectResource.resourceType === 'object' + ? commitOrObjectResource.resourceId + : null + + return { + targetUser, + author, + stream, + threadComment, + mentionComment, + commitOrObjectId: { commitId, objectId }, + serverInfo + } +} + +/** + * Notification that is triggered when a user is mentioned in a comment + */ +const createdOrUpdatedCommentHandlerFactory = + (deps: { + getUser: GetUser + getStream: GetStream + getCommentResolver: (deps: { projectDb: Knex }) => GetComment + getServerInfo: GetServerInfo + saveUserNotifications: StoreUserNotifications + getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType + }) => + async ({ payload }: EventType<'comments.created' | 'comments.updated'>) => { + const mentionedUserIds = + 'comment' in payload + ? processCommentMentions(payload.comment) + : processCommentMentions(payload.newComment, payload.previousComment) + + for (const targetUserId of mentionedUserIds) { + const { + authorId, + streamId, + id: commentId, + parentComment + } = 'comment' in payload ? payload.comment : payload.newComment + + // do not notify yourself + if (authorId === targetUserId) continue + + const threadId = parentComment || commentId + const isCommentAndThreadTheSame = threadId === commentId + const projectDb = await getProjectDbClient({ projectId: streamId }) + const getComment = deps.getCommentResolver({ projectDb }) + + const [targetUser, author, stream, threadComment, comment, serverInfo] = + await Promise.all([ + deps.getUser(targetUserId), + deps.getUser(authorId), + deps.getStream({ streamId }), + getComment({ id: threadId }), + isCommentAndThreadTheSame ? null : getComment({ id: commentId }), + deps.getServerInfo() + ]) + + const mentionComment = isCommentAndThreadTheSame ? threadComment : comment + + // Validate message + const state = validateCommentNotification({ + targetUser, + author, + stream, + threadComment, + mentionComment, + serverInfo + }) + + const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType( + state.targetUser.id, + NotificationType.MentionedInComment, + NotificationChannel.Email + ) + + const now = new Date() + await deps.saveUserNotifications([ + { + id: cryptoRandomString({ length: 10 }), + userId: state.targetUser.id, + type: NotificationType.MentionedInComment, + read: false, + version: '1', + payload: { + threadId: state.threadComment.id, + authorId: state.author.id, + commentId: state.mentionComment.id, + streamId: state.stream.id + }, + sendEmailAt: isSubscribedToEmail ? now : null, + createdAt: now, + updatedAt: now + } + ]) + } + } + +export const handler = async ( + event: EventType<'comments.created' | 'comments.updated'> +) => { + const createdOrUpdatedCommentHandler = createdOrUpdatedCommentHandlerFactory({ + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }), + getCommentResolver: ({ projectDb }) => getCommentFactory({ db: projectDb }), + getServerInfo: getServerInfoFactory({ db }), + saveUserNotifications: storeUserNotificationsFactory({ db }), + getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({ + getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({ + db + }) + }) + }) + + return createdOrUpdatedCommentHandler(event) +} + +export default handler diff --git a/packages/server/modules/notifications/services/events/handlers/streamAccessRequestCreated.ts b/packages/server/modules/notifications/services/events/handlers/streamAccessRequestCreated.ts new file mode 100644 index 000000000..67c98e38e --- /dev/null +++ b/packages/server/modules/notifications/services/events/handlers/streamAccessRequestCreated.ts @@ -0,0 +1,157 @@ +import { + AccessRequestType, + getPendingAccessRequestFactory +} from '@/modules/accessrequests/repositories' +import { NotificationValidationError } from '@/modules/notifications/errors' +import { Roles } from '@/modules/core/helpers/mainConstants' +import { sendEmail } from '@/modules/emails/services/sending' +import { renderEmail } from '@/modules/emails/services/emailRendering' +import { db } from '@/db/knex' +import type { GetPendingAccessRequest } from '@/modules/accessrequests/domain/operations' +import type { + GetStream, + GetStreamCollaborators +} from '@/modules/core/domain/streams/operations' +import { + getStreamCollaboratorsFactory, + getStreamFactory +} from '@/modules/core/repositories/streams' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { getUserFactory } from '@/modules/core/repositories/users' +import type { GetServerInfo } from '@/modules/core/domain/server/operations' +import { getServerInfoFactory } from '@/modules/core/repositories/server' +import type { EventBusPayloads, EventType } from '@/modules/shared/services/eventBus' +import type { + GetUserPreferenceForNotificationType, + StoreUserNotifications +} from '@/modules/notifications/domain/operations' +import type { UserNotificationRecord } from '@/modules/notifications/helpers/types' +import { NotificationChannel } from '@/modules/notifications/helpers/types' +import cryptoRandomString from 'crypto-random-string' +import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' +import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences' +import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences' +import { NotificationType } from '@speckle/shared/notifications' + +type ValidateMessageDeps = { + getPendingAccessRequest: GetPendingAccessRequest + getUser: GetUser + getStream: GetStream + getStreamCollaborators: GetStreamCollaborators +} + +const validateMessageFactory = + (deps: ValidateMessageDeps) => + async ({ payload }: { payload: EventBusPayloads['accessrequests.created'] }) => { + const { + request: { id: requestId, resourceId: streamId } + } = payload + + if (!streamId) throw new NotificationValidationError('No stream ID provided') + + const stream = await deps.getStream({ streamId }) + if (!stream) throw new NotificationValidationError('Nonexistant stream') + + const request = await deps.getPendingAccessRequest( + requestId, + AccessRequestType.Stream + ) + if (!request) + throw new NotificationValidationError('Nonexistant stream access request') + + const owners = await deps.getStreamCollaborators(streamId, Roles.Stream.Owner) + if (!owners.length) throw new NotificationValidationError('Stream has no owners') + + const requester = await deps.getUser(request.requesterId) + if (!requester) + throw new NotificationValidationError( + 'User who made the request no longer exists' + ) + + const targetUsers = [] + for (const owner of owners) { + const [user, streamWithRole] = await Promise.all([ + deps.getUser(owner.id), + deps.getStream({ + streamId: request.resourceId, + userId: owner.id + }) + ]) + + if (!user) throw new NotificationValidationError('User no longer exists') + if (!streamWithRole) throw new NotificationValidationError('Nonexistant stream') + if (streamWithRole.role !== Roles.Stream.Owner) + throw new NotificationValidationError( + 'Only stream owners can receive notifications about stream access requests' + ) + + targetUsers.push(user) + } + + return { + request, + stream, + targetUsers, + requester + } + } + +const streamAccessRequestCreatedHandlerFactory = + ( + deps: { + getServerInfo: GetServerInfo + renderEmail: typeof renderEmail + sendEmail: typeof sendEmail + saveUserNotifications: StoreUserNotifications + getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType + } & ValidateMessageDeps + ) => + async (event: EventType<'accessrequests.created'>) => { + const state = await validateMessageFactory(deps)(event) + const now = new Date() + const notifications: UserNotificationRecord[] = [] + for (const targetUser of state.targetUsers) { + const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType( + targetUser.id, + NotificationType.NewStreamAccessRequest, + NotificationChannel.Email + ) + + notifications.push({ + id: cryptoRandomString({ length: 10 }), + userId: targetUser.id, + type: NotificationType.NewStreamAccessRequest, + read: false, + version: '1', + payload: { + streamId: state.stream.id, + requesterId: state.requester.id + }, + sendEmailAt: isSubscribedToEmail ? now : null, + createdAt: now, + updatedAt: now + }) + } + await deps.saveUserNotifications(notifications) + } + +export const handler = (event: EventType<'accessrequests.created'>) => { + const streamAccessRequestCreatedHandler = streamAccessRequestCreatedHandlerFactory({ + getServerInfo: getServerInfoFactory({ db }), + renderEmail, + sendEmail, + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }), + getPendingAccessRequest: getPendingAccessRequestFactory({ db }), + getStreamCollaborators: getStreamCollaboratorsFactory({ db }), + saveUserNotifications: storeUserNotificationsFactory({ db }), + getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({ + getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({ + db + }) + }) + }) + return streamAccessRequestCreatedHandler(event) +} + +export default handler diff --git a/packages/server/modules/notifications/services/events/handlers/streamAccessRequestFinalized.ts b/packages/server/modules/notifications/services/events/handlers/streamAccessRequestFinalized.ts new file mode 100644 index 000000000..74b148e38 --- /dev/null +++ b/packages/server/modules/notifications/services/events/handlers/streamAccessRequestFinalized.ts @@ -0,0 +1,114 @@ +import { db } from '@/db/knex' +import type { GetStream } from '@/modules/core/domain/streams/operations' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { getStreamFactory } from '@/modules/core/repositories/streams' +import { getUserFactory } from '@/modules/core/repositories/users' +import type { + GetUserPreferenceForNotificationType, + StoreUserNotifications +} from '@/modules/notifications/domain/operations' +import { NotificationValidationError } from '@/modules/notifications/errors' +import { NotificationChannel } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' +import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' +import type { EventBusPayloads, EventType } from '@/modules/shared/services/eventBus' +import type { Nullable } from '@speckle/shared' +import cryptoRandomString from 'crypto-random-string' +import { getUserPreferenceForNotificationTypeFactory } from '@/modules/notifications/services/notificationPreferences' +import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences' + +type ValidateMessageDeps = { + getUser: GetUser + getStream: GetStream +} + +const validateEventFactory = + (deps: ValidateMessageDeps) => + async ({ + targetUserId, + resourceId, + finalizedBy + }: { + targetUserId: string + resourceId: Nullable + finalizedBy: string + }) => { + if (!resourceId) throw new NotificationValidationError('No stream provided') + + const [targetUser, finalizer, stream] = await Promise.all([ + deps.getUser(targetUserId), + deps.getUser(finalizedBy), + deps.getStream({ streamId: resourceId, userId: targetUserId }) + ]) + + if (!targetUser) + throw new NotificationValidationError('Invalid notification target user') + if (!finalizer) + throw new NotificationValidationError('Invalid notification finalizer') + if (!stream) throw new NotificationValidationError('Invalid stream') + if (!stream.role) + throw new NotificationValidationError( + 'User doesnt appear to have a role on the stream' + ) + + return { targetUser, finalizer, stream } + } + +const steamAccessRequestFinalizedHandlerFactory = + ( + deps: { + saveUserNotifications: StoreUserNotifications + getUserPreferenceForNotificationType: GetUserPreferenceForNotificationType + } & ValidateMessageDeps + ) => + async (args: { + payload: EventBusPayloads['accessrequests.finalized'] // TODO: smarter typing + }) => { + const { approved, request, finalizedBy } = args.payload + // notify only approvals + if (!approved) return + + const state = await validateEventFactory(deps)({ + targetUserId: request.requesterId, + resourceId: request.resourceId, + finalizedBy + }) + + const isSubscribedToEmail = await deps.getUserPreferenceForNotificationType( + state.targetUser.id, + NotificationType.StreamAccessRequestApproved, + NotificationChannel.Email + ) + const now = new Date() + await deps.saveUserNotifications([ + { + id: cryptoRandomString({ length: 10 }), + userId: state.targetUser.id, + type: NotificationType.StreamAccessRequestApproved, + read: false, + version: '1', + payload: { + streamId: state.stream.id + }, + sendEmailAt: isSubscribedToEmail ? now : null, + createdAt: now, + updatedAt: now + } + ]) + } + +export const handler = async (event: EventType<'accessrequests.finalized'>) => { + const steamAccessRequestFinalizedHandler = steamAccessRequestFinalizedHandlerFactory({ + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }), + saveUserNotifications: storeUserNotificationsFactory({ db }), + getUserPreferenceForNotificationType: getUserPreferenceForNotificationTypeFactory({ + getSavedUserNotificationPreferences: getSavedUserNotificationPreferencesFactory({ + db + }) + }) + }) + return steamAccessRequestFinalizedHandler(event) +} + +export default handler diff --git a/packages/server/modules/notifications/services/events/queue.ts b/packages/server/modules/notifications/services/events/queue.ts new file mode 100644 index 000000000..89c4fc5a7 --- /dev/null +++ b/packages/server/modules/notifications/services/events/queue.ts @@ -0,0 +1,102 @@ +import { UninitializedResourceAccessError } from '@/modules/shared/errors' +import type { Optional } from '@/modules/shared/helpers/typeHelper' +import { getRedisUrl, isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper' +import type Bull from 'bull' +import { initializeQueue as setupQueue } from '@speckle/shared/queue' +import { TIME_MS } from '@speckle/shared' +import type { NotificationEvents } from '@/modules/notifications/events/notificationListener' +import { notificationsLogger, Observability } from '@/observability/logging' +import { UnhandledNotificationError } from '@/modules/notifications/errors' +import CreatedOrUpdatedCommentHandler from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment' +import StreamAccessRequestCreatedHandler from '@/modules/notifications/services/events/handlers/streamAccessRequestCreated' +import StreamAccessRequestFinalizedHandler from '@/modules/notifications/services/events/handlers/streamAccessRequestFinalized' +import { CommentEvents } from '@/modules/comments/domain/events' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' + +export const NOTIFICATION_EVENTS_QUEUE = 'default:user-event-notifications' + +let queue: Optional + +export const buildNotificationEventsQueue = async (queueName: string) => + await setupQueue({ + queueName, + redisUrl: getRedisUrl(), + options: { + ...(!isTestEnv() + ? { + limiter: { + max: 10, + duration: TIME_MS.second + } + } + : {}), + defaultJobOptions: { + attempts: 1, + timeout: 10 * TIME_MS.second, + removeOnComplete: isProdEnv(), + removeOnFail: isProdEnv() + } + } + }) + +export function getQueue(): Bull.Queue { + if (!queue) { + throw new UninitializedResourceAccessError( + 'Attempting to use uninitialized Bull queue' + ) + } + + return queue +} + +export async function initializeNotificationEventsQueue() { + queue = await buildNotificationEventsQueue(NOTIFICATION_EVENTS_QUEUE) +} + +export async function initializeNotificationEventsConsumption() { + const queue = getQueue() + + void queue.process(async ({ data: event }: Bull.Job) => { + const notificationLogger = Observability.extendLoggerComponent( + notificationsLogger, + event.eventName + ) + + try { + notificationLogger.info('Handling notifications for event') + + switch (event.eventName) { + case CommentEvents.Created: + case CommentEvents.Updated: + await CreatedOrUpdatedCommentHandler(event) + break + case AccessRequestEvents.Created: + await StreamAccessRequestCreatedHandler(event) + break + case AccessRequestEvents.Finalized: + await StreamAccessRequestFinalizedHandler(event) + break + default: + throw new UnhandledNotificationError(null, { info: event }) + } + + notificationLogger.info('Handled notifications for event') + } catch (e: unknown) { + notificationsLogger.error(e) + } + }) +} + +/** + * Publish message onto the notifications queue + */ +export async function publishEventMessage(message: NotificationEvents) { + const queue = getQueue() + const job = await queue.add(message) + return job.id +} + +export async function shutdownEventQueue() { + if (!queue) return + await queue.close() +} diff --git a/packages/server/modules/notifications/services/notificationPreferences.ts b/packages/server/modules/notifications/services/notificationPreferences.ts index 70335ec7a..3b7c75d11 100644 --- a/packages/server/modules/notifications/services/notificationPreferences.ts +++ b/packages/server/modules/notifications/services/notificationPreferences.ts @@ -1,14 +1,13 @@ import type { NotificationPreferences } from '@/modules/notifications/helpers/types' -import { - NotificationChannel, - NotificationType -} from '@/modules/notifications/helpers/types' +import { NotificationChannel } from '@/modules/notifications/helpers/types' import { InvalidArgumentError } from '@/modules/shared/errors' import type { GetSavedUserNotificationPreferences, GetUserNotificationPreferences, + GetUserPreferenceForNotificationType, SaveUserNotificationPreferences } from '@/modules/notifications/domain/operations' +import { NotificationType } from '@speckle/shared/notifications' export const getUserNotificationPreferencesFactory = (deps: { @@ -35,6 +34,18 @@ function addDefaultPreferenceValues( return savedPreferences } +export const getUserPreferenceForNotificationTypeFactory = + (deps: { + getSavedUserNotificationPreferences: GetSavedUserNotificationPreferences + }): GetUserPreferenceForNotificationType => + async (userId, notificationType, notificationChannel) => { + const preferences = await deps.getSavedUserNotificationPreferences(userId) + if (!preferences) return true + + const notificationTypeSettings = preferences[notificationType] + return notificationTypeSettings?.[notificationChannel] ?? false + } + export const updateNotificationPreferencesFactory = (deps: { saveUserNotificationPreferences: SaveUserNotificationPreferences }) => async (userId: string, rawPreferences: Record): Promise => { diff --git a/packages/server/modules/notifications/services/handlers/activityDigest.ts b/packages/server/modules/notifications/services/publication/handlers/activityDigest.ts similarity index 99% rename from packages/server/modules/notifications/services/handlers/activityDigest.ts rename to packages/server/modules/notifications/services/publication/handlers/activityDigest.ts index 68dcbc074..32b4b6d23 100644 --- a/packages/server/modules/notifications/services/handlers/activityDigest.ts +++ b/packages/server/modules/notifications/services/publication/handlers/activityDigest.ts @@ -17,7 +17,7 @@ import path from 'path' import * as ejs from 'ejs' import { renderEmail } from '@/modules/emails/services/emailRendering' import { getUserNotificationPreferencesFactory } from '@/modules/notifications/services/notificationPreferences' -import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories' +import { getSavedUserNotificationPreferencesFactory } from '@/modules/notifications/repositories/userNotificationPreferences' import { db } from '@/db/knex' import type { GetUserNotificationPreferences } from '@/modules/notifications/domain/operations' import type { CreateActivitySummary } from '@/modules/activitystream/domain/operations' diff --git a/packages/server/modules/notifications/services/handlers/mentionedInComment.ts b/packages/server/modules/notifications/services/publication/handlers/mentionedInComment.ts similarity index 99% rename from packages/server/modules/notifications/services/handlers/mentionedInComment.ts rename to packages/server/modules/notifications/services/publication/handlers/mentionedInComment.ts index 9197bc6cd..aa31064ce 100644 --- a/packages/server/modules/notifications/services/handlers/mentionedInComment.ts +++ b/packages/server/modules/notifications/services/publication/handlers/mentionedInComment.ts @@ -116,7 +116,7 @@ function buildEmailTemplateMjml( Hello,

${author.name} has just mentioned you in a comment on the ${stream.name} project. - Please click on the button below to see the comment. + Please click on the button below to see the comment. `, bodyEnd: `

` @@ -130,7 +130,7 @@ function buildEmailTemplateText( return { bodyStart: `Hello - + ${author.name} has just mentioned you in a comment on the ${stream.name} project. Please open the link below to see the comment.`, bodyEnd: undefined diff --git a/packages/server/modules/notifications/services/handlers/newStreamAccessRequest.ts b/packages/server/modules/notifications/services/publication/handlers/newStreamAccessRequest.ts similarity index 100% rename from packages/server/modules/notifications/services/handlers/newStreamAccessRequest.ts rename to packages/server/modules/notifications/services/publication/handlers/newStreamAccessRequest.ts diff --git a/packages/server/modules/notifications/services/handlers/streamAccessRequestApproved.ts b/packages/server/modules/notifications/services/publication/handlers/streamAccessRequestApproved.ts similarity index 100% rename from packages/server/modules/notifications/services/handlers/streamAccessRequestApproved.ts rename to packages/server/modules/notifications/services/publication/handlers/streamAccessRequestApproved.ts diff --git a/packages/server/modules/notifications/services/publication.ts b/packages/server/modules/notifications/services/publication/publishNotification.ts similarity index 58% rename from packages/server/modules/notifications/services/publication.ts rename to packages/server/modules/notifications/services/publication/publishNotification.ts index 0bb67a9f2..a0b06cdab 100644 --- a/packages/server/modules/notifications/services/publication.ts +++ b/packages/server/modules/notifications/services/publication/publishNotification.ts @@ -2,10 +2,12 @@ import type { NotificationPublisher, NotificationTypeMessageMap } from '@/modules/notifications/helpers/types' -import { publishMessage } from '@/modules/notifications/services/queue' +import { publishMessage } from '@/modules/notifications/services/publication/queue' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' /** * Publish a notification + * @deprecated new implementations should be built using the notificationListener handlers */ export const publishNotification: NotificationPublisher = async (type, params) => { const msg = { @@ -13,5 +15,9 @@ export const publishNotification: NotificationPublisher = async (type, params) = ...params } as NotificationTypeMessageMap[typeof type] + // return is only consumed by specs + // this satisfies ts + if (isNotificationListenerEnabled()) return -1 + return await publishMessage(msg) } diff --git a/packages/server/modules/notifications/services/queue.ts b/packages/server/modules/notifications/services/publication/queue.ts similarity index 96% rename from packages/server/modules/notifications/services/queue.ts rename to packages/server/modules/notifications/services/publication/queue.ts index eb7337fed..772e47cee 100644 --- a/packages/server/modules/notifications/services/queue.ts +++ b/packages/server/modules/notifications/services/publication/queue.ts @@ -8,7 +8,6 @@ import { import type { NotificationHandler, NotificationMessage, - NotificationType, NotificationTypeHandlers } from '@/modules/notifications/helpers/types' import { isNotificationMessage } from '@/modules/notifications/helpers/types' @@ -21,6 +20,7 @@ import { ensureErrorOrWrapAsCause } from '@/modules/shared/errors/ensureError' import { TIME_MS } from '@speckle/shared' import { getEventBus } from '@/modules/shared/services/eventBus' import { NotificationsEvents } from '@/modules/notifications/domain/events' +import type { NotificationType } from '@speckle/shared/notifications' export type NotificationJobResult = { status: NotificationJobResultsStatus @@ -87,7 +87,7 @@ export function getQueue(): Bull.Queue { /** * Initialize notifications queue */ -export async function initializeQueue() { +export async function initializePublicationQueue() { queue = await buildNotificationsQueue(NOTIFICATIONS_QUEUE) } @@ -186,7 +186,7 @@ export async function consumeIncomingNotifications() { }) } -export async function shutdownQueue() { +export async function shutdownPublicationQueue() { if (!queue) return await queue.close() } diff --git a/packages/server/modules/notifications/tasks/delayedNotifications.ts b/packages/server/modules/notifications/tasks/delayedNotifications.ts new file mode 100644 index 000000000..3bf145856 --- /dev/null +++ b/packages/server/modules/notifications/tasks/delayedNotifications.ts @@ -0,0 +1,105 @@ +import { db } from '@/db/knex' +import { + acquireTaskLockFactory, + releaseTaskLockFactory +} from '@/modules/core/repositories/scheduledTasks' +import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' +import type { Logger } from '@/observability/logging' +import { + getNextEmailNotificationFactory, + updateUserNotificationFactory +} from '@/modules/notifications/repositories/userNotification' +import { NotificationType } from '@speckle/shared/notifications' +import MentionedInCommentHandler from '@/modules/notifications/tasks/handlers/mentionedInComment' +import StreamAccessRequestApprovedHandler from '@/modules/notifications/tasks/handlers/streamAccessRequestApproved' +import NewStreamAccessRequestHandler from '@/modules/notifications/tasks/handlers/newStreamAccessRequest' +import { ensureNotificationToLatestVersion } from '@/modules/notifications/helpers/toLatestVersion' + +type EmailNotificationResult = { notificationId: string } | null + +const handleNextEmailNotification = async (deps: { + logger: Logger +}): Promise => + db.transaction(async (trx) => { + const baseNotification = await getNextEmailNotificationFactory({ db: trx })() + if (!baseNotification) return null + + const notification = ensureNotificationToLatestVersion(baseNotification) + if (!notification) return null + + try { + switch (notification.type) { + case NotificationType.MentionedInComment: + await MentionedInCommentHandler(notification) + break + case NotificationType.StreamAccessRequestApproved: + await StreamAccessRequestApprovedHandler(notification) + break + case NotificationType.NewStreamAccessRequest: + await NewStreamAccessRequestHandler(notification) + break + default: + deps.logger.error( + { + type: notification.type, + notificationId: notification.id + }, + `No handler scheduled notification type. Skipping.` + ) + break + } + } catch (error) { + deps.logger.error( + { + error, + type: notification.type, + notificationId: notification.id + }, + `Error handling notification. Skipping.` + ) + } + + await updateUserNotificationFactory({ db: trx })({ + id: notification.id, + userId: notification.userId, + update: { + sendEmailAt: null, + updatedAt: new Date() + } + }) + + return { notificationId: notification.id } + }) + +export const emitDelayedEmailNotifications = async (deps: { logger: Logger }) => { + let result: EmailNotificationResult + const MAX_ITERATIONS = 10_000 + let iterationCount = 0 + + do { + if (iterationCount++ >= MAX_ITERATIONS) { + deps.logger.error(`Reached max iteration limit of ${MAX_ITERATIONS}.`) + break + } + + result = await handleNextEmailNotification(deps) + } while (result) +} + +export const scheduleDelayedEmailNotifications = async () => { + const scheduleExecution = scheduleExecutionFactory({ + acquireTaskLock: acquireTaskLockFactory({ db }), + releaseTaskLock: releaseTaskLockFactory({ db }) + }) + + const everyMin = '*/1 * * * *' + return scheduleExecution( + everyMin, + 'DelayedEmailNotifications', + async (_scheduledTime, { logger }) => { + await emitDelayedEmailNotifications({ + logger + }) + } + ) +} diff --git a/packages/server/modules/notifications/tasks/handlers/mentionedInComment.ts b/packages/server/modules/notifications/tasks/handlers/mentionedInComment.ts new file mode 100644 index 000000000..6498dab76 --- /dev/null +++ b/packages/server/modules/notifications/tasks/handlers/mentionedInComment.ts @@ -0,0 +1,150 @@ +import { db } from '@/db/knex' +import type { GetComment } from '@/modules/comments/domain/operations' +import { getCommentFactory } from '@/modules/comments/repositories/comments' +import type { GetServerInfo } from '@/modules/core/domain/server/operations' +import type { GetStream } from '@/modules/core/domain/streams/operations' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { getCommentRoute } from '@/modules/core/helpers/routeHelper' +import { getServerInfoFactory } from '@/modules/core/repositories/server' +import { getStreamFactory } from '@/modules/core/repositories/streams' +import { getUserFactory } from '@/modules/core/repositories/users' +import type { EmailTemplateParams } from '@/modules/emails/domain/operations' +import { renderEmail } from '@/modules/emails/services/emailRendering' +import { sendEmail } from '@/modules/emails/services/sending' +import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import type { UserNotificationRecord } from '@/modules/notifications/helpers/types' +import type { NotificationType } from '@speckle/shared/notifications' +import { getFrontendOrigin } from '@/modules/shared/helpers/envHelper' +import type { Knex } from 'knex' +import { validateCommentNotification } from '@/modules/notifications/services/events/handlers/createdOrUpdatedComment' + +type MentionedInCommentNotification = Extract< + UserNotificationRecord, + { type: NotificationType.MentionedInComment } +> + +function buildEmailTemplateMjml( + state: ReturnType +): EmailTemplateParams['mjml'] { + const { author, stream } = state + + return { + bodyStart: ` + + Hello,
+
+ ${author.name} has just mentioned you in a comment on the ${stream.name} project. + Please click on the button below to see the comment. +
+ `, + bodyEnd: `

` + } +} + +function buildEmailTemplateText( + state: ReturnType +): EmailTemplateParams['text'] { + const { author, stream } = state + + return { + bodyStart: `Hello + +${author.name} has just mentioned you in a comment on the ${stream.name} project. +Please open the link below to see the comment.`, + bodyEnd: undefined + } +} + +function buildEmailTemplateParams( + state: ReturnType +): EmailTemplateParams { + const { + commitOrObjectId: { objectId, commitId }, + stream, + threadComment + } = state + + const commentRoute = getCommentRoute(stream.id, threadComment.id, { + objectId, + commitId + }) + const url = new URL(commentRoute, getFrontendOrigin()).toString() + + return { + mjml: buildEmailTemplateMjml(state), + text: buildEmailTemplateText(state), + cta: { + url, + title: 'View comment thread' + } + } +} + +/** + * Notification that is triggered when a user is mentioned in a comment + */ +const mentionedInCommentEmailHandlerFactory = + (deps: { + getUser: GetUser + getStream: GetStream + getCommentResolver: (deps: { projectDb: Knex }) => GetComment + getServerInfo: GetServerInfo + renderEmail: typeof renderEmail + sendEmail: typeof sendEmail + }) => + async (notification: MentionedInCommentNotification) => { + const { threadId, commentId, authorId, streamId } = notification.payload + + const isCommentAndThreadTheSame = threadId === commentId + const projectDb = await getProjectDbClient({ projectId: streamId }) + const getComment = deps.getCommentResolver({ projectDb }) + + const [targetUser, author, stream, threadComment, comment, serverInfo] = + await Promise.all([ + deps.getUser(notification.userId), + deps.getUser(authorId), + deps.getStream({ streamId }), + getComment({ id: threadId }), + isCommentAndThreadTheSame ? null : getComment({ id: commentId }), + deps.getServerInfo() + ]) + + const mentionComment = isCommentAndThreadTheSame ? threadComment : comment + + // Validate message + const state = validateCommentNotification({ + targetUser, + author, + stream, + threadComment, + mentionComment, + serverInfo + }) + + const templateParams = buildEmailTemplateParams(state) + const { text, html } = await deps.renderEmail( + templateParams, + serverInfo, + targetUser + ) + await deps.sendEmail({ + to: state.targetUser.email, + text, + html, + subject: "You've just been mentioned in a Speckle comment" + }) + } + +export const handler = async (notification: MentionedInCommentNotification) => { + const mentionedInCommentHandler = mentionedInCommentEmailHandlerFactory({ + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }), + getCommentResolver: ({ projectDb }) => getCommentFactory({ db: projectDb }), + getServerInfo: getServerInfoFactory({ db }), + renderEmail, + sendEmail + }) + return mentionedInCommentHandler(notification) +} + +export default handler diff --git a/packages/server/modules/notifications/tasks/handlers/newStreamAccessRequest.ts b/packages/server/modules/notifications/tasks/handlers/newStreamAccessRequest.ts new file mode 100644 index 000000000..d3a5e1f16 --- /dev/null +++ b/packages/server/modules/notifications/tasks/handlers/newStreamAccessRequest.ts @@ -0,0 +1,170 @@ +import { getPendingAccessRequestFactory } from '@/modules/accessrequests/repositories' +import { NotificationValidationError } from '@/modules/notifications/errors' +import { Roles } from '@/modules/core/helpers/mainConstants' +import { + buildAbsoluteFrontendUrlFromPath, + getStreamCollaboratorsRoute +} from '@/modules/core/helpers/routeHelper' +import { sendEmail } from '@/modules/emails/services/sending' +import { renderEmail } from '@/modules/emails/services/emailRendering' +import { db } from '@/db/knex' +import type { GetPendingAccessRequest } from '@/modules/accessrequests/domain/operations' +import type { + GetStream, + GetStreamCollaborators +} from '@/modules/core/domain/streams/operations' +import { + getStreamCollaboratorsFactory, + getStreamFactory +} from '@/modules/core/repositories/streams' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { getUserFactory } from '@/modules/core/repositories/users' +import type { GetServerInfo } from '@/modules/core/domain/server/operations' +import { getServerInfoFactory } from '@/modules/core/repositories/server' +import type { EmailTemplateParams } from '@/modules/emails/domain/operations' +import type { StoreUserNotifications } from '@/modules/notifications/domain/operations' +import type { UserNotificationRecord } from '@/modules/notifications/helpers/types' +import type { NotificationType } from '@speckle/shared/notifications' +import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' + +type ValidateMessageDeps = { + getPendingAccessRequest: GetPendingAccessRequest + getUser: GetUser + getStream: GetStream + getStreamCollaborators: GetStreamCollaborators +} + +type NewSreamAccessRequestNotification = Extract< + UserNotificationRecord, + { type: NotificationType.NewStreamAccessRequest } +> + +const validateMessageFactory = + (deps: ValidateMessageDeps) => + async (notification: NewSreamAccessRequestNotification) => { + const { streamId, requesterId } = notification.payload + const userId = notification.userId + + if (!streamId) throw new NotificationValidationError('No stream ID provided') + + const stream = await deps.getStream({ streamId }) + if (!stream) throw new NotificationValidationError('Nonexistant stream') + + const requester = await deps.getUser(requesterId) + if (!requester) + throw new NotificationValidationError( + 'User who made the request no longer exists' + ) + + const [targetUser, streamWithRole] = await Promise.all([ + deps.getUser(userId), + deps.getStream({ + streamId, + userId + }) + ]) + + if (!targetUser) throw new NotificationValidationError('User no longer exists') + if (!streamWithRole) throw new NotificationValidationError('Nonexistant stream') + if (streamWithRole.role !== Roles.Stream.Owner) + throw new NotificationValidationError( + 'Only stream owners can receive notifications about stream access requests' + ) + + return { + stream, + targetUser, + requester + } + } + +type ValidatedMessageState = Awaited< + ReturnType> +> + +function buildEmailTemplateHtml( + state: ValidatedMessageState +): EmailTemplateParams['mjml'] { + const { requester, stream } = state + + return { + bodyStart: ` +Hello,
+
+${requester.name} requested access to the ${stream.name} project. +You can add them as a collaborator by clicking the button below. +
+`, + bodyEnd: ` +You received this email because you are an owner on ${stream.name}. +` + } +} + +function buildEmailTemplateText( + state: ValidatedMessageState +): EmailTemplateParams['text'] { + const { requester, stream } = state + + return { + bodyStart: `Hello,\n\n${requester.name} requested access to the ${stream.name} project. You can add them as a collaborator by opening the link below.`, + bodyEnd: `You received this email because you are an owner on ${stream.name}` + } +} + +function buildEmailTemplateParams(state: ValidatedMessageState): EmailTemplateParams { + const { stream } = state + + return { + mjml: buildEmailTemplateHtml(state), + text: buildEmailTemplateText(state), + cta: { + title: 'Review Request', + url: buildAbsoluteFrontendUrlFromPath(getStreamCollaboratorsRoute(stream.id)) + } + } +} + +const newSreamAccessRequestHandlerFactory = + ( + deps: { + getServerInfo: GetServerInfo + renderEmail: typeof renderEmail + sendEmail: typeof sendEmail + saveUserNotifications: StoreUserNotifications + } & ValidateMessageDeps + ) => + async (notification: NewSreamAccessRequestNotification) => { + const state = await validateMessageFactory(deps)(notification) + + const htmlTemplateParams = buildEmailTemplateParams(state) + const serverInfo = await deps.getServerInfo() + const { html, text } = await deps.renderEmail( + htmlTemplateParams, + serverInfo, + state.targetUser + ) + + await deps.sendEmail({ + to: state.targetUser.email, + text, + html, + subject: 'A user requested access to your project' + }) + } + +export const handler = (notification: NewSreamAccessRequestNotification) => { + const streamAccessRequestCreatedHandler = newSreamAccessRequestHandlerFactory({ + getServerInfo: getServerInfoFactory({ db }), + renderEmail, + sendEmail, + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }), + getPendingAccessRequest: getPendingAccessRequestFactory({ db }), + getStreamCollaborators: getStreamCollaboratorsFactory({ db }), + saveUserNotifications: storeUserNotificationsFactory({ db }) + }) + return streamAccessRequestCreatedHandler(notification) +} + +export default handler diff --git a/packages/server/modules/notifications/tasks/handlers/streamAccessRequestApproved.ts b/packages/server/modules/notifications/tasks/handlers/streamAccessRequestApproved.ts new file mode 100644 index 000000000..aed7f173f --- /dev/null +++ b/packages/server/modules/notifications/tasks/handlers/streamAccessRequestApproved.ts @@ -0,0 +1,134 @@ +import { db } from '@/db/knex' +import type { GetServerInfo } from '@/modules/core/domain/server/operations' +import type { GetStream } from '@/modules/core/domain/streams/operations' +import type { GetUser } from '@/modules/core/domain/users/operations' +import { + buildAbsoluteFrontendUrlFromPath, + getStreamRoute +} from '@/modules/core/helpers/routeHelper' +import { getServerInfoFactory } from '@/modules/core/repositories/server' +import { getStreamFactory } from '@/modules/core/repositories/streams' +import { getUserFactory } from '@/modules/core/repositories/users' +import type { EmailTemplateParams } from '@/modules/emails/domain/operations' +import { renderEmail } from '@/modules/emails/services/emailRendering' +import { sendEmail } from '@/modules/emails/services/sending' +import { NotificationValidationError } from '@/modules/notifications/errors' +import type { UserNotificationRecord } from '@/modules/notifications/helpers/types' +import type { NotificationType } from '@speckle/shared/notifications' + +type ValidateMessageDeps = { + getUser: GetUser + getStream: GetStream +} + +type StreamAccessApprovedNotification = Extract< + UserNotificationRecord, + { type: NotificationType.StreamAccessRequestApproved } +> + +const validateNotificationFactory = + (deps: ValidateMessageDeps) => + async (notification: StreamAccessApprovedNotification) => { + const streamId = notification.payload.streamId + if (!streamId) throw new NotificationValidationError('No stream provided') + + const [targetUser, stream] = await Promise.all([ + deps.getUser(notification.userId), + deps.getStream({ streamId, userId: notification.userId }) + ]) + + if (!targetUser) + throw new NotificationValidationError('Invalid notification target user') + if (!stream) throw new NotificationValidationError('Invalid stream') + if (!stream.role) + throw new NotificationValidationError( + 'User doesnt appear to have a role on the stream' + ) + + return { targetUser, stream } + } + +type ValidatedMessageState = Awaited< + ReturnType> +> + +function buildEmailTemplateMjml( + state: ValidatedMessageState +): EmailTemplateParams['mjml'] { + const { stream } = state + + return { + bodyStart: ` +Hello,
+
+You have just been granted access to the ${stream.name} project. Check it out below: +
+`, + bodyEnd: ` +You received this email because you requested access to this project +` + } +} + +function buildEmailTemplateText( + state: ValidatedMessageState +): EmailTemplateParams['text'] { + const { stream } = state + + return { + bodyStart: `Hello,\n\nYou have just been granted access to the ${stream.name} stream. Check it below:`, + bodyEnd: `You received this email because you requested access to this stream` + } +} + +function buildEmailTemplateParams(state: ValidatedMessageState): EmailTemplateParams { + const { stream } = state + return { + mjml: buildEmailTemplateMjml(state), + text: buildEmailTemplateText(state), + cta: { + title: 'View Stream', + url: buildAbsoluteFrontendUrlFromPath(getStreamRoute(stream.id)) + } + } +} + +const steamAccessRequestFinalizedHandlerFactory = + ( + deps: { + getServerInfo: GetServerInfo + renderEmail: typeof renderEmail + sendEmail: typeof sendEmail + } & ValidateMessageDeps + ) => + async (notification: StreamAccessApprovedNotification) => { + const state = await validateNotificationFactory(deps)(notification) + + const htmlTemplateParams = buildEmailTemplateParams(state) + const serverInfo = await deps.getServerInfo() + const { html, text } = await deps.renderEmail( + htmlTemplateParams, + serverInfo, + state.targetUser + ) + + await deps.sendEmail({ + to: state.targetUser.email, + text, + html, + subject: 'Your project access request has been approved' + }) + } + +export const handler = async (notification: StreamAccessApprovedNotification) => { + const steamAccessRequestFinalizedHandler = steamAccessRequestFinalizedHandlerFactory({ + getServerInfo: getServerInfoFactory({ db }), + renderEmail, + sendEmail, + getUser: getUserFactory({ db }), + getStream: getStreamFactory({ db }) + }) + return steamAccessRequestFinalizedHandler(notification) +} + +export default handler diff --git a/packages/server/modules/notifications/tests/activityDigest.spec.ts b/packages/server/modules/notifications/tests/activityDigest.spec.ts index 1f4da20d6..0ac4d1b10 100644 --- a/packages/server/modules/notifications/tests/activityDigest.spec.ts +++ b/packages/server/modules/notifications/tests/activityDigest.spec.ts @@ -16,7 +16,7 @@ import { renderEmail } from '@/modules/emails/services/emailRendering' import type { DigestTopic, Digest -} from '@/modules/notifications/services/handlers/activityDigest' +} from '@/modules/notifications/services/publication/handlers/activityDigest' import { digestMostActiveStream, mostActiveComment, @@ -26,7 +26,7 @@ import { digestActiveStreams, closingOverview, prepareSummaryEmailFactory -} from '@/modules/notifications/services/handlers/activityDigest' +} from '@/modules/notifications/services/publication/handlers/activityDigest' import { expect } from 'chai' import { range } from 'lodash-es' diff --git a/packages/server/modules/notifications/tests/helpers.ts b/packages/server/modules/notifications/tests/helpers.ts new file mode 100644 index 000000000..ff3e11b1f --- /dev/null +++ b/packages/server/modules/notifications/tests/helpers.ts @@ -0,0 +1,35 @@ +import { db } from '@/db/knex' +import { type UserNotificationRecord } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' +import { storeUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' +import cryptoRandomString from 'crypto-random-string' +import { assign } from 'lodash-es' + +export const buildTestNotification = ( + overrides?: Partial +): UserNotificationRecord => + assign( + { + id: cryptoRandomString({ length: 10 }), + userId: cryptoRandomString({ length: 10 }), + type: NotificationType.MentionedInComment, + version: '1', + read: false, + payload: {}, + sendEmailAt: null, + createdAt: new Date(), + updatedAt: new Date() + }, + overrides + ) + +export const createTestNotification = async ( + notification?: UserNotificationRecord +): Promise => { + const storeUserNotifications = storeUserNotificationsFactory({ db }) + + const storeNotification = notification || buildTestNotification() + await storeUserNotifications([storeNotification]) + + return storeNotification +} diff --git a/packages/server/modules/notifications/tests/notifications.graph.spec.ts b/packages/server/modules/notifications/tests/notifications.graph.spec.ts new file mode 100644 index 000000000..5b22fa55f --- /dev/null +++ b/packages/server/modules/notifications/tests/notifications.graph.spec.ts @@ -0,0 +1,184 @@ +import { db } from '@/db/knex' +import { UserNotifications } from '@/modules/core/dbSchema' +import { + GetUserNotificationsDocument, + UserBulkDeleteNotidicationDocument, + UserBulkUpdateNotificationsDocument +} from '@/modules/core/graph/generated/graphql' +import { getUserNotificationsFactory } from '@/modules/notifications/repositories/userNotification' +import { + buildTestNotification, + createTestNotification +} from '@/modules/notifications/tests/helpers' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' +import type { BasicTestUser } from '@/test/authHelper' +import { createTestUser } from '@/test/authHelper' +import type { TestApolloServer } from '@/test/graphqlHelper' +import { testApolloServer } from '@/test/graphqlHelper' +import { beforeEachContext, truncateTables } from '@/test/hooks' +import { expect } from 'chai' +import { times } from 'lodash-es' + +isNotificationListenerEnabled() + ? describe('Notifications GQL', () => { + let apollo: TestApolloServer + let user: BasicTestUser + let anotherUser: BasicTestUser + + before(async () => { + await beforeEachContext() + user = await createTestUser() + anotherUser = await createTestUser() + + apollo = await testApolloServer({ authUserId: user.id }) + }) + + beforeEach(async () => { + await truncateTables([UserNotifications.name]) + }) + + it('pulls only your notifications', async () => { + await createTestNotification( + buildTestNotification({ + userId: user.id + }) + ) + await createTestNotification( + buildTestNotification({ + userId: anotherUser.id + }) + ) + await createTestNotification( + buildTestNotification({ + userId: user.id + }) + ) + + const { data } = await apollo.execute( + GetUserNotificationsDocument, + {}, + { assertNoErrors: true } + ) + + expect(data?.activeUser?.notifications.items).to.have.lengthOf(2) + }) + + it('paginates your notifications', async () => { + await Promise.all( + times(50).map(async () => + createTestNotification( + buildTestNotification({ + userId: user.id + }) + ) + ) + ) + + const { data } = await apollo.execute( + GetUserNotificationsDocument, + { limit: 10 }, + { assertNoErrors: true } + ) + + expect(data?.activeUser?.notifications.items).to.have.lengthOf(10) + expect(data?.activeUser?.notifications.cursor).to.be.a('string') + expect(data?.activeUser?.notifications.totalCount).to.be.eq(50) + }) + + it('allows deleting only your notifications', async () => { + const n1 = await createTestNotification( + buildTestNotification({ + userId: user.id + }) + ) + const n2 = await createTestNotification( + buildTestNotification({ + userId: anotherUser.id + }) + ) + const n3 = await createTestNotification( + buildTestNotification({ + userId: user.id + }) + ) + + await apollo.execute( + UserBulkDeleteNotidicationDocument, + { + ids: [n1.id, n2.id, n3.id] // n2 shouldn't be deleted + }, + { assertNoErrors: true } + ) + const { data } = await apollo.execute( + GetUserNotificationsDocument, + {}, + { assertNoErrors: true } + ) + const otherNotifications = await getUserNotificationsFactory({ db })({ + userId: anotherUser.id, + cursor: null, + limit: null + }) + + expect(data?.activeUser?.notifications.totalCount).to.be.equal(0) + expect(data?.activeUser?.notifications.items).to.have.lengthOf(0) + expect(otherNotifications.items).to.have.lengthOf(1) + }) + + it('allows updating read field in the notification', async () => { + const n1 = await createTestNotification( + buildTestNotification({ + userId: user.id, + read: false + }) + ) + const n2 = await createTestNotification( + buildTestNotification({ + userId: anotherUser.id, + read: false + }) + ) + const n3 = await createTestNotification( + buildTestNotification({ + userId: user.id, + read: false + }) + ) + + await apollo.execute( + UserBulkUpdateNotificationsDocument, + { + input: [ + { + id: n1.id, + read: true + }, + { + id: n2.id, + read: true // n2 shouldn't be updated + }, + { + id: n3.id, + read: true + } + ] + }, + { assertNoErrors: true } + ) + const { data } = await apollo.execute( + GetUserNotificationsDocument, + {}, + { assertNoErrors: true } + ) + const otherNotifications = await getUserNotificationsFactory({ db })({ + userId: anotherUser.id, + cursor: null, + limit: null + }) + + expect(data?.activeUser?.notifications.items[0].read).to.be.true + expect(data?.activeUser?.notifications.items[1].read).to.be.true + expect(otherNotifications.items[0].read).to.be.false + }) + }) + : {} diff --git a/packages/server/modules/notifications/tests/notifications.spec.ts b/packages/server/modules/notifications/tests/notifications.spec.ts deleted file mode 100644 index 6d54995ba..000000000 --- a/packages/server/modules/notifications/tests/notifications.spec.ts +++ /dev/null @@ -1,130 +0,0 @@ -import type { MentionedInCommentData } from '@/modules/notifications/helpers/types' -import { NotificationType } from '@/modules/notifications/helpers/types' -import { publishNotification } from '@/modules/notifications/services/publication' -import type { NotificationsStateManager } from '@/test/notificationsHelper' -import { - buildNotificationsStateTracker, - purgeNotifications -} from '@/test/notificationsHelper' -import { expect } from 'chai' -import { - InvalidNotificationError, - NotificationValidationError, - UnhandledNotificationError -} from '@/modules/notifications/errors' -import { NotificationJobResultsStatus } from '@/modules/notifications/services/queue' -import { getEventBus } from '@/modules/shared/services/eventBus' -import { NotificationsEvents } from '@/modules/notifications/domain/events' - -describe('Notifications', () => { - let notificationsState: NotificationsStateManager - - before(async () => { - await purgeNotifications() - notificationsState = await buildNotificationsStateTracker() - }) - - after(async () => { - notificationsState.destroy() - }) - - afterEach(() => { - notificationsState.reset() - }) - - it('can be emitted and routed to proper handler on consumption', async () => { - const targetUserId = '1234555' - const data: MentionedInCommentData = { - threadId: 'aaa', - commentId: 'bbb', - authorId: 'ccc', - streamId: 'ddd' - } - - // Enqueue notification - const msgId = await publishNotification(NotificationType.MentionedInComment, { - targetUserId, - data - }) - - // Wait for ack - await notificationsState.waitForMsgAck(msgId) - - const enqueuedMessage = notificationsState.collectedMessages().at(-1)! - expect(enqueuedMessage).to.be.ok - expect(enqueuedMessage?.targetUserId).to.eq(targetUserId) - expect(enqueuedMessage?.type).to.eq(NotificationType.MentionedInComment) - expect(enqueuedMessage?.data).to.deep.equalInAnyOrder(data) - }) - - it('fail safely when emitted with an unexpected structure', async () => { - // Enqueue notification with invalid structure - const msgId = await publishNotification(NotificationType.MentionedInComment, { - a: 1, - b: 2 - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any) - - const { err } = await notificationsState.waitForMsgAck(msgId) - - expect(err).to.be.ok - expect(err instanceof InvalidNotificationError).to.be.true - expect(err?.message).to.contain('invalid notification') - }) - - it('fail safely when emitted with an unexpected type', async () => { - // Enqueue notification with invalid structure - const msgId = await publishNotification('booooooooo' as NotificationType, { - targetUserId: '123', - data: { - a: 123 - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any - }) - - const { err } = await notificationsState.waitForMsgAck(msgId) - expect(err).to.be.ok - expect(err instanceof UnhandledNotificationError).to.be.true - }) - - const validationErrorDataSet = [ - { - display: 'successful', - error: new NotificationValidationError('expected validation isue') - }, - { display: 'unsuccessful', error: new Error('ooohhhh') } - ] - - validationErrorDataSet.forEach(({ display, error }) => { - it(`fail with ${display} ack when handler throws ${error.name}`, async () => { - const data: MentionedInCommentData = { - threadId: 'aaa', - commentId: 'bbb', - authorId: 'ccc', - streamId: 'ddd' - } - - getEventBus().listenOnce(NotificationsEvents.Received, () => { - throw error - }) - - const msgId = await publishNotification(NotificationType.MentionedInComment, { - targetUserId: '123', - data - }) - - const { err, result } = await notificationsState.waitForMsgAck(msgId) - - const isValidationError = error instanceof NotificationValidationError - if (isValidationError) { - expect(err).to.be.not.ok - expect(result?.status).to.eq(NotificationJobResultsStatus.ValidationError) - } else { - expect(err).to.be.ok - expect(err?.name).to.eq(error.name) - expect(err?.message).to.eq(error.message) - expect(result).to.be.not.ok - } - }) - }) -}) diff --git a/packages/server/modules/notifications/tests/notificationsPreferences.spec.ts b/packages/server/modules/notifications/tests/notificationsPreferences.spec.ts index d74ac32ca..74ca524f4 100644 --- a/packages/server/modules/notifications/tests/notificationsPreferences.spec.ts +++ b/packages/server/modules/notifications/tests/notificationsPreferences.spec.ts @@ -3,10 +3,8 @@ import { UserNotificationPreferences, Users } from '@/modules/core/dbSchema' import type { BasicTestUser } from '@/test/authHelper' import { createTestUsers } from '@/test/authHelper' import { expect } from 'chai' -import { - NotificationType, - NotificationChannel -} from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' +import { NotificationChannel } from '@/modules/notifications/helpers/types' import { BaseError } from '@/modules/shared/errors' import { getUserNotificationPreferencesFactory, @@ -15,7 +13,7 @@ import { import { getSavedUserNotificationPreferencesFactory, saveUserNotificationPreferencesFactory -} from '@/modules/notifications/repositories' +} from '@/modules/notifications/repositories/userNotificationPreferences' import { db } from '@/db/knex' const getSavedUserNotificationPreferences = getSavedUserNotificationPreferencesFactory({ diff --git a/packages/server/modules/notifications/tests/pushNotifications.spec.ts b/packages/server/modules/notifications/tests/pushNotifications.spec.ts new file mode 100644 index 000000000..31959afea --- /dev/null +++ b/packages/server/modules/notifications/tests/pushNotifications.spec.ts @@ -0,0 +1,133 @@ +import type { MentionedInCommentData } from '@/modules/notifications/helpers/types' +import { NotificationType } from '@speckle/shared/notifications' +import { publishNotification } from '@/modules/notifications/services/publication/publishNotification' +import type { NotificationsStateManager } from '@/test/notificationsHelper' +import { + buildNotificationsStateTracker, + purgeNotifications +} from '@/test/notificationsHelper' +import { expect } from 'chai' +import { + InvalidNotificationError, + NotificationValidationError, + UnhandledNotificationError +} from '@/modules/notifications/errors' +import { NotificationJobResultsStatus } from '@/modules/notifications/services/publication/queue' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { NotificationsEvents } from '@/modules/notifications/domain/events' +import { isNotificationListenerEnabled } from '@/modules/shared/helpers/envHelper' + +!isNotificationListenerEnabled() + ? describe('Notifications', () => { + let notificationsState: NotificationsStateManager + + before(async () => { + await purgeNotifications() + notificationsState = await buildNotificationsStateTracker() + }) + + after(async () => { + notificationsState.destroy() + }) + + afterEach(() => { + notificationsState.reset() + }) + + it('can be emitted and routed to proper handler on consumption', async () => { + const targetUserId = '1234555' + const data: MentionedInCommentData = { + threadId: 'aaa', + commentId: 'bbb', + authorId: 'ccc', + streamId: 'ddd' + } + + // Enqueue notification + const msgId = await publishNotification(NotificationType.MentionedInComment, { + targetUserId, + data + }) + + // Wait for ack + await notificationsState.waitForMsgAck(msgId) + + const enqueuedMessage = notificationsState.collectedMessages().at(-1)! + expect(enqueuedMessage).to.be.ok + expect(enqueuedMessage?.targetUserId).to.eq(targetUserId) + expect(enqueuedMessage?.type).to.eq(NotificationType.MentionedInComment) + expect(enqueuedMessage?.data).to.deep.equalInAnyOrder(data) + }) + + it('fail safely when emitted with an unexpected structure', async () => { + // Enqueue notification with invalid structure + const msgId = await publishNotification(NotificationType.MentionedInComment, { + a: 1, + b: 2 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any) + + const { err } = await notificationsState.waitForMsgAck(msgId) + + expect(err).to.be.ok + expect(err instanceof InvalidNotificationError).to.be.true + expect(err?.message).to.contain('invalid notification') + }) + + it('fail safely when emitted with an unexpected type', async () => { + // Enqueue notification with invalid structure + const msgId = await publishNotification('booooooooo' as NotificationType, { + targetUserId: '123', + data: { + a: 123 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any + }) + + const { err } = await notificationsState.waitForMsgAck(msgId) + expect(err).to.be.ok + expect(err instanceof UnhandledNotificationError).to.be.true + }) + + const validationErrorDataSet = [ + { + display: 'successful', + error: new NotificationValidationError('expected validation isue') + }, + { display: 'unsuccessful', error: new Error('ooohhhh') } + ] + + validationErrorDataSet.forEach(({ display, error }) => { + it(`fail with ${display} ack when handler throws ${error.name}`, async () => { + const data: MentionedInCommentData = { + threadId: 'aaa', + commentId: 'bbb', + authorId: 'ccc', + streamId: 'ddd' + } + + getEventBus().listenOnce(NotificationsEvents.Received, () => { + throw error + }) + + const msgId = await publishNotification(NotificationType.MentionedInComment, { + targetUserId: '123', + data + }) + + const { err, result } = await notificationsState.waitForMsgAck(msgId) + + const isValidationError = error instanceof NotificationValidationError + if (isValidationError) { + expect(err).to.be.not.ok + expect(result?.status).to.eq(NotificationJobResultsStatus.ValidationError) + } else { + expect(err).to.be.ok + expect(err?.name).to.eq(error.name) + expect(err?.message).to.eq(error.message) + expect(result).to.be.not.ok + } + }) + }) + }) + : {} diff --git a/packages/server/modules/shared/helpers/envHelper.ts b/packages/server/modules/shared/helpers/envHelper.ts index 54d058d81..79ee49e66 100644 --- a/packages/server/modules/shared/helpers/envHelper.ts +++ b/packages/server/modules/shared/helpers/envHelper.ts @@ -558,3 +558,6 @@ export function getOdaUserSecret() { export const areSavedViewsEnabled = (): boolean => getFeatureFlags().FF_SAVED_VIEWS_ENABLED + +export const isNotificationListenerEnabled = (): boolean => + getFeatureFlags().FF_NOTIFICATION_LISTENER_ENABLED diff --git a/packages/server/modules/shared/services/eventBus.ts b/packages/server/modules/shared/services/eventBus.ts index 9fbeab651..900365309 100644 --- a/packages/server/modules/shared/services/eventBus.ts +++ b/packages/server/modules/shared/services/eventBus.ts @@ -154,6 +154,14 @@ export type EventPayload = T extends AllEventsWi ? EventPayloadsMap[T] : never +/** + * To single specify which event to use + */ +export type EventType = { + eventName: EventName + payload: EventTypes[EventName] +} + export function initializeEventBus() { const emitter = new EventEmitter({ wildcard: true }) @@ -163,10 +171,9 @@ export function initializeEventBus() { * execute. Any errors thrown in the listeners will bubble up and throw from * the part of code that triggers this emit() call. */ - emit: async (args: { - eventName: EventName - payload: EventTypes[EventName] - }): Promise => { + emit: async ( + args: EventType + ): Promise => { // curate the proper payload here and eventName object here, before emitting await emitter.emitAsync(args.eventName, args) }, diff --git a/packages/server/test/graphql/notifications.ts b/packages/server/test/graphql/notifications.ts new file mode 100644 index 000000000..6ce3a1b1d --- /dev/null +++ b/packages/server/test/graphql/notifications.ts @@ -0,0 +1,36 @@ +import gql from 'graphql-tag' + +export const getUserNotifications = gql` + query GetUserNotifications($limit: Int, $cursor: String) { + activeUser { + notifications(limit: $limit, cursor: $cursor) { + items { + id + type + createdAt + payload + read + updatedAt + } + cursor + totalCount + } + } + } +` + +export const deleteUserNotifications = gql` + mutation UserBulkDeleteNotidication($ids: [String!]!) { + notificationMutations { + bulkDelete(ids: $ids) + } + } +` + +export const updateUserNotifications = gql` + mutation UserBulkUpdateNotifications($input: [NotificationUpdateInput!]!) { + notificationMutations { + bulkUpdate(input: $input) + } + } +` diff --git a/packages/server/test/notificationsHelper.ts b/packages/server/test/notificationsHelper.ts index 74ade674d..6d9652bc3 100644 --- a/packages/server/test/notificationsHelper.ts +++ b/packages/server/test/notificationsHelper.ts @@ -1,6 +1,6 @@ import { notificationsLogger as logger } from '@/observability/logging' -import type { NotificationJobResult } from '@/modules/notifications/services/queue' -import { getQueue } from '@/modules/notifications/services/queue' +import type { NotificationJobResult } from '@/modules/notifications/services/publication/queue' +import { getQueue } from '@/modules/notifications/services/publication/queue' import { EventEmitter } from 'events' import type { CompletedEventCallback, FailedEventCallback, JobId } from 'bull' import { pick } from 'lodash-es' diff --git a/packages/shared/package.json b/packages/shared/package.json index 71ce687ca..a07f4b4c8 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -108,6 +108,7 @@ "./acc": "./src/acc/index.ts", "./dist/*": "./dist/*", "./images/base64": "./src/images/base64.ts", + "./notifications": "./src/notifications/index.ts", "./saved-views": "./src/saved-views/index.ts" }, "exclude": [ @@ -345,6 +346,16 @@ "default": "./dist/commonjs/images/base64.js" } }, + "./notifications": { + "import": { + "types": "./dist/esm/notifications/index.d.ts", + "default": "./dist/esm/notifications/index.js" + }, + "require": { + "types": "./dist/commonjs/notifications/index.d.ts", + "default": "./dist/commonjs/notifications/index.js" + } + }, "./saved-views": { "import": { "types": "./dist/esm/saved-views/index.d.ts", diff --git a/packages/shared/src/core/constants.ts b/packages/shared/src/core/constants.ts index eda70fbb4..92d26f5d5 100644 --- a/packages/shared/src/core/constants.ts +++ b/packages/shared/src/core/constants.ts @@ -120,6 +120,10 @@ export const Scopes = Object.freeze({ Email: 'users:email', Invite: 'users:invite' }, + Notifications: { + Read: 'notifications:read', + Write: 'notifications:write' + }, Server: { Stats: 'server:stats', Setup: 'server:setup' @@ -153,6 +157,8 @@ export const Scopes = Object.freeze({ export type StreamScopes = (typeof Scopes)['Streams'][keyof (typeof Scopes)['Streams']] export type ProfileScopes = (typeof Scopes)['Profile'][keyof (typeof Scopes)['Profile']] export type UserScopes = (typeof Scopes)['Users'][keyof (typeof Scopes)['Users']] +export type NotificationScopes = + (typeof Scopes)['Notifications'][keyof (typeof Scopes)['Notifications']] export type ServerScopes = (typeof Scopes)['Server'][keyof (typeof Scopes)['Server']] export type TokenScopes = (typeof Scopes)['Tokens'][keyof (typeof Scopes)['Tokens']] export type AppScopes = (typeof Scopes)['Apps'][keyof (typeof Scopes)['Apps']] @@ -175,6 +181,7 @@ export type AvailableScopes = | AutomateScopes | AutomateFunctionScopes | WorkspaceScopes + | NotificationScopes /** * All scopes diff --git a/packages/shared/src/environment/featureFlags.ts b/packages/shared/src/environment/featureFlags.ts index 8e1d34314..d7a65c77c 100644 --- a/packages/shared/src/environment/featureFlags.ts +++ b/packages/shared/src/environment/featureFlags.ts @@ -22,4 +22,5 @@ export type FeatureFlags = { FF_DASHBOARDS_MODULE_ENABLED: boolean FF_SAVED_VIEWS_ENABLED: boolean FF_USERS_INVITE_SCOPE_IS_PUBLIC: boolean + FF_NOTIFICATION_LISTENER_ENABLED: boolean } diff --git a/packages/shared/src/environment/index.ts b/packages/shared/src/environment/index.ts index c2317e828..cd632ccf7 100644 --- a/packages/shared/src/environment/index.ts +++ b/packages/shared/src/environment/index.ts @@ -150,6 +150,11 @@ export const parseFeatureFlags = ( description: 'Enables Personal Access Tokens (PAT) to be created with users:invite scope. **WARNING** This can be used to spam invitations to any email address. It is not advised to enable this on servers which are open to public account registration or to which untrusted users have been, or can be, invited.', defaults: { _: false } + }, + FF_NOTIFICATION_LISTENER_ENABLED: { + schema: z.boolean(), + description: 'Enables notifications being triggered by server event listeners', + defaults: { _: false } } }) diff --git a/packages/shared/src/notifications/helpers/types.ts b/packages/shared/src/notifications/helpers/types.ts new file mode 100644 index 000000000..706529f37 --- /dev/null +++ b/packages/shared/src/notifications/helpers/types.ts @@ -0,0 +1,26 @@ +export enum NotificationType { + /** + * @deprecated ActivityDigest will be removed in a future release + */ + ActivityDigest = 'activityDigest', + MentionedInComment = 'mentionedInComment', + NewStreamAccessRequest = 'newStreamAccessRequest', + StreamAccessRequestApproved = 'streamAccessRequestApproved' +} + +export type NotificationPayloadMap = { + [NotificationType.MentionedInComment]: { + threadId: string + authorId: string + commentId: string + streamId: string + } + [NotificationType.NewStreamAccessRequest]: { + streamId: string + requesterId: string + } + [NotificationType.StreamAccessRequestApproved]: { + streamId: string + } + [NotificationType.ActivityDigest]: Record +} diff --git a/packages/shared/src/notifications/index.ts b/packages/shared/src/notifications/index.ts new file mode 100644 index 000000000..648d18359 --- /dev/null +++ b/packages/shared/src/notifications/index.ts @@ -0,0 +1 @@ +export * from './helpers/types.js' diff --git a/utils/helm/speckle-server/templates/_helpers.tpl b/utils/helm/speckle-server/templates/_helpers.tpl index 661150e86..b027378d3 100644 --- a/utils/helm/speckle-server/templates/_helpers.tpl +++ b/utils/helm/speckle-server/templates/_helpers.tpl @@ -613,6 +613,9 @@ Generate the environment variables for Speckle server and Speckle objects deploy - name: FF_NO_PERSONAL_EMAILS_ENABLED value: {{ .Values.featureFlags.noPersonalEmailsEnabled | quote }} +- name: FF_NOTIFICATION_LISTENER_ENABLED + value: {{ .Values.featureFlags.notificationListenerEnabled | quote }} + {{- if .Values.featureFlags.accIntegrationEnabled }} - name: AUTODESK_INTEGRATION_CLIENT_ID value: {{ .Values.server.accIntegration.client_id }}