Files
speckle-server/packages/server/modules/notifications/services/queue.ts
T
Gergő Jedlicska d1d5984e30 gergo/summaryEmails (#979)
* refactor(server emails): email transports module refactor to TypeScript

* refactor(docker-compose deps): move local email server to common dev compose file

* chore(server launch.json): add ts-node script running example

* chore(server deps): add nodemailer types package

* refactor(server activitystream): add strongly typed activity definitions

* feat(server activitystream): add activity repository

* feat(server info): add canonical url on the service level

* feat(server): add static file serving route to server core

* feat(server): add dependencies for periodical email digests

* feat(server activity stream): call the initialization step from the activity stream module

* feat(server activity digest): add WIP weekly email digest implementation

* feat(server digest email): smul upgrades and fixes to the email template and its contents

* just for Fabs to test

* chore(root package.json): remove deleted docker-compose references

* feat(frontend profile): add notification preferences panel

* feat(server digest emails): set prod ready cron tab and timespan

* refactor(server email digest): move templates into the email module

* refactor(server activity digests): refactor to use notifications infrastructure

* test(server activities): add tests and some refactor to activities and notification preferences

* refactor(notification preferences): fix minor issues

* test(server notification preferences test): fix describe nesting

* fix(server activities): add missing action types

* fix(server activities): fix errors after merging main

* test(server activity notifications): add test coverage for activity notifications service

* refactor(server activities): fixing tests and some cleanup

* feat(server cli): add summary notification command to cli

* chore(dev env db versions): upgrade local dev env versions

* chore(server deps): upgrade local dev db to pg 14

* fix(docker-compose): bind maildev to localhost

* process-scoped notifications test queues

* test(activity tests): add  sleep to fix flaky CI

* feat(activity digests): add demo date for digest trigger

* feat(activity digest): add UK timezone trigger date

Co-authored-by: Iain Sproat <68657+iainsproat@users.noreply.github.com>
Co-authored-by: Fabians <fabis94@live.com>
2022-09-09 12:46:57 +02:00

172 lines
4.8 KiB
TypeScript

import { UninitializedResourceAccessError } from '@/modules/shared/errors'
import { Optional } from '@/modules/shared/helpers/typeHelper'
import {
InvalidNotificationError,
NotificationValidationError,
UnhandledNotificationError
} from '@/modules/notifications/errors'
import {
isNotificationMessage,
NotificationHandler,
NotificationMessage,
NotificationType,
NotificationTypeHandlers
} from '@/modules/notifications/helpers/types'
import { notificationsDebug } from '@/modules/shared/utils/logger'
import { isProdEnv, isTestEnv } from '@/modules/shared/helpers/envHelper'
import Bull from 'bull'
import { buildBaseQueueOptions } from '@/modules/shared/helpers/bullHelper'
import cryptoRandomString from 'crypto-random-string'
export type NotificationJobResult = {
status: NotificationJobResultsStatus
type: NotificationType | undefined
}
export enum NotificationJobResultsStatus {
Success = 'success',
ValidationError = 'validation-error'
}
const handlers = new Map<NotificationType, NotificationHandler>()
const NOTIFICATIONS_QUEUE_MAIN_BASE = `default:user-notifications`
const NOTIFICATIONS_QUEUE_TEST_BASE = `test:user-notifications`
const PROCESS_ID = cryptoRandomString({ length: 5 })
export const NOTIFICATIONS_QUEUE = isTestEnv()
? `${NOTIFICATIONS_QUEUE_TEST_BASE}:${PROCESS_ID}`
: NOTIFICATIONS_QUEUE_MAIN_BASE
if (isTestEnv()) {
console.log('Notifications test queue ID: ' + NOTIFICATIONS_QUEUE)
console.log(`Monitor using: 'yarn cli bull monitor ${NOTIFICATIONS_QUEUE}'`)
}
let queue: Optional<Bull.Queue>
export const buildNotificationsQueue = (queueName: string) =>
new Bull(queueName, {
...buildBaseQueueOptions(),
...(!isTestEnv()
? {
limiter: {
max: 10,
duration: 1000
}
}
: {}),
defaultJobOptions: {
attempts: 1,
timeout: 10 * 1000, // 10s execution timeout
removeOnComplete: isProdEnv(),
removeOnFail: isProdEnv()
}
})
/**
* Get queue, if it's been initialized
*/
export function getQueue(): Bull.Queue {
if (!queue) {
throw new UninitializedResourceAccessError(
'Attempting to use uninitialized Bull queue'
)
}
return queue
}
/**
* Initialize notifications queue
*/
export function initializeQueue() {
queue = buildNotificationsQueue(NOTIFICATIONS_QUEUE)
}
/**
* Publish message onto the notifications queue
*/
export async function publishMessage(message: NotificationMessage) {
const queue = getQueue()
const job = await queue.add(message)
return job.id
}
/**
* Register notification message handlers for various notification types.
*
* The param is typed so that you can't add mismatch notification types to the wrong handlers.
* To adjust which NotificationType values are mapped to which messages and handlers
* see NotificationTypeMessageMap.
*/
export function registerNotificationHandlers(
newHandlers: Partial<NotificationTypeHandlers>
) {
for (const [type, handler] of Object.entries(newHandlers)) {
handlers.set(type as NotificationType, handler)
}
}
/**
* Start consuming incoming notifications off the queue
* @returns Producer, that you can use to cancel consumption
*/
export async function consumeIncomingNotifications() {
const queue = getQueue()
queue.process(async (job): Promise<NotificationJobResult> => {
let notificationType: Optional<NotificationType>
try {
notificationsDebug('New notification received...')
// Parse
const payload = job.data
const typedPayload = isNotificationMessage(payload) ? payload : undefined
if (!typedPayload) {
throw new InvalidNotificationError('Received an invalid notification', {
info: {
payload
}
})
}
// Invoke correct handler
const type = typedPayload.type
notificationType = type
const handler = handlers.get(type)
if (!handler) {
throw new UnhandledNotificationError(null, { info: { payload, type } })
}
const notificationDebug = notificationsDebug.extend(type)
notificationDebug('Starting processing notification...')
await Promise.resolve(handler(typedPayload, { job, debug: notificationDebug }))
notificationDebug('...successfully processed notification')
return {
status: NotificationJobResultsStatus.Success,
type
}
} catch (e: unknown) {
notificationsDebug(e)
const err =
e instanceof Error ? e : new Error('Unexpected notification consumption error')
if (!(err instanceof NotificationValidationError)) {
throw err
}
return {
status: NotificationJobResultsStatus.ValidationError,
type: notificationType
}
}
})
}
export async function shutdownQueue() {
if (!queue) return
await queue.close()
}