From 53844db17b46bf45568402e89ad815c5077ee495 Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Mon, 14 Oct 2024 12:00:47 +0300 Subject: [PATCH] chore(server): core IoC #43 - scheduleExecutionFactory --- .../server/modules/activitystream/index.ts | 10 +++- .../core/domain/scheduledTasks/operations.ts | 13 +++++ .../core/domain/scheduledTasks/types.ts | 3 ++ .../core/repositories/scheduledTasks.ts | 29 ++++++---- .../modules/core/services/taskScheduler.ts | 54 ++++++++++--------- .../modules/core/tests/scheduledTasks.spec.ts | 8 ++- packages/server/modules/webhooks/index.ts | 8 ++- 7 files changed, 83 insertions(+), 42 deletions(-) create mode 100644 packages/server/modules/core/domain/scheduledTasks/operations.ts create mode 100644 packages/server/modules/core/domain/scheduledTasks/types.ts diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index 3f3b16ab3..cb8a38d19 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -1,7 +1,6 @@ import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' import { initializeEventListenerFactory } from '@/modules/activitystream/services/eventListener' import { publishNotification } from '@/modules/notifications/services/publication' -import { scheduleExecution } from '@/modules/core/services/taskScheduler' import { activitiesLogger, moduleLogger } from '@/logging/logging' import { weeklyEmailDigestEnabled } from '@/modules/shared/helpers/envHelper' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -19,8 +18,11 @@ import { addStreamAccessRequestDeclinedActivityFactory, addStreamAccessRequestedActivityFactory } from '@/modules/activitystream/services/accessRequestActivity' +import { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' +import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' +import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks' -let scheduledTask: ReturnType | null = null +let scheduledTask: ReturnType | null = null let quitEventListeners: Optional> = undefined @@ -41,6 +43,10 @@ const initializeEventListeners = () => { } const scheduleWeeklyActivityNotifications = () => { + const scheduleExecution = scheduleExecutionFactory({ + acquireTaskLock: acquireTaskLockFactory({ db }) + }) + // just to test stuff // every 1000 seconds // const cronExpression = '*/1000 * * * * *' diff --git a/packages/server/modules/core/domain/scheduledTasks/operations.ts b/packages/server/modules/core/domain/scheduledTasks/operations.ts new file mode 100644 index 000000000..e1f51a858 --- /dev/null +++ b/packages/server/modules/core/domain/scheduledTasks/operations.ts @@ -0,0 +1,13 @@ +import { ScheduledTask } from '@/modules/core/domain/scheduledTasks/types' +import cron from 'node-cron' + +export type AcquireTaskLock = ( + scheduledTask: ScheduledTask +) => Promise + +export type ScheduleExecution = ( + cronExpression: string, + taskName: string, + callback: (scheduledTime: Date) => Promise, + lockTimeout?: number +) => cron.ScheduledTask diff --git a/packages/server/modules/core/domain/scheduledTasks/types.ts b/packages/server/modules/core/domain/scheduledTasks/types.ts new file mode 100644 index 000000000..e2b6d11ad --- /dev/null +++ b/packages/server/modules/core/domain/scheduledTasks/types.ts @@ -0,0 +1,3 @@ +import { ScheduledTaskRecord } from '@/modules/core/helpers/types' + +export type ScheduledTask = ScheduledTaskRecord diff --git a/packages/server/modules/core/repositories/scheduledTasks.ts b/packages/server/modules/core/repositories/scheduledTasks.ts index 7fcd8cc6d..eb7f2ee96 100644 --- a/packages/server/modules/core/repositories/scheduledTasks.ts +++ b/packages/server/modules/core/repositories/scheduledTasks.ts @@ -1,15 +1,22 @@ import { ScheduledTasks } from '@/modules/core/dbSchema' +import { AcquireTaskLock } from '@/modules/core/domain/scheduledTasks/operations' import { ScheduledTaskRecord } from '@/modules/core/helpers/types' +import { Knex } from 'knex' -export async function acquireTaskLock( - scheduledTask: ScheduledTaskRecord -): Promise { - const now = new Date() - const [lock] = await ScheduledTasks.knex() - .insert(scheduledTask) - .onConflict(ScheduledTasks.withoutTablePrefix.col.taskName) - .merge() - .where(ScheduledTasks.col.lockExpiresAt, '<', now) - .returning('*') - return (lock as ScheduledTaskRecord) ?? null +const tables = { + scheduledTasks: (db: Knex) => db(ScheduledTasks.name) } + +export const acquireTaskLockFactory = + (deps: { db: Knex }): AcquireTaskLock => + async (scheduledTask: ScheduledTaskRecord): Promise => { + const now = new Date() + const [lock] = await tables + .scheduledTasks(deps.db) + .insert(scheduledTask) + .onConflict(ScheduledTasks.withoutTablePrefix.col.taskName) + .merge() + .where(ScheduledTasks.col.lockExpiresAt, '<', now) + .returning('*') + return (lock as ScheduledTaskRecord) ?? null + } diff --git a/packages/server/modules/core/services/taskScheduler.ts b/packages/server/modules/core/services/taskScheduler.ts index 30f6e50bb..3b0a7163b 100644 --- a/packages/server/modules/core/services/taskScheduler.ts +++ b/packages/server/modules/core/services/taskScheduler.ts @@ -1,18 +1,18 @@ import cron from 'node-cron' import { InvalidArgumentError } from '@/modules/shared/errors' import { ensureError } from '@/modules/shared/helpers/errorHelper' -import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks' -import { ScheduledTaskRecord } from '@/modules/core/helpers/types' import { activitiesLogger } from '@/logging/logging' +import { + AcquireTaskLock, + ScheduleExecution +} from '@/modules/core/domain/scheduledTasks/operations' export const scheduledCallbackWrapper = async ( scheduledTime: Date, taskName: string, lockTimeout: number, callback: (scheduledTime: Date) => Promise, - acquireLock: ( - scheduledTask: ScheduledTaskRecord - ) => Promise + acquireLock: AcquireTaskLock ) => { const boundLogger = activitiesLogger.child({ taskName }) // try to acquire the task lock with the function name and a new expiration date @@ -48,24 +48,26 @@ export const scheduledCallbackWrapper = async ( } } -export const scheduleExecution = ( - cronExpression: string, - taskName: string, - callback: (scheduledTime: Date) => Promise, - lockTimeout = 60 * 1000 -): cron.ScheduledTask => { - const expressionValid = cron.validate(cronExpression) - if (!expressionValid) - throw new InvalidArgumentError( - `The given cron expression ${cronExpression} is not valid` - ) - return cron.schedule(cronExpression, async (scheduledTime: Date) => { - await scheduledCallbackWrapper( - scheduledTime, - taskName, - lockTimeout, - callback, - acquireTaskLock - ) - }) -} +export const scheduleExecutionFactory = + (deps: { acquireTaskLock: AcquireTaskLock }): ScheduleExecution => + ( + cronExpression: string, + taskName: string, + callback: (scheduledTime: Date) => Promise, + lockTimeout = 60 * 1000 + ): cron.ScheduledTask => { + const expressionValid = cron.validate(cronExpression) + if (!expressionValid) + throw new InvalidArgumentError( + `The given cron expression ${cronExpression} is not valid` + ) + return cron.schedule(cronExpression, async (scheduledTime: Date) => { + await scheduledCallbackWrapper( + scheduledTime, + taskName, + lockTimeout, + callback, + deps.acquireTaskLock + ) + }) + } diff --git a/packages/server/modules/core/tests/scheduledTasks.spec.ts b/packages/server/modules/core/tests/scheduledTasks.spec.ts index b88a43979..3aa95fc28 100644 --- a/packages/server/modules/core/tests/scheduledTasks.spec.ts +++ b/packages/server/modules/core/tests/scheduledTasks.spec.ts @@ -1,15 +1,19 @@ import { describe } from 'mocha' import { ScheduledTasks } from '@/modules/core/dbSchema' import { truncateTables } from '@/test/hooks' -import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks' import { ensureError } from '@/modules/shared/helpers/errorHelper' import { scheduledCallbackWrapper, - scheduleExecution + scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' import { expect } from 'chai' import { sleep } from '@/test/helpers' import cryptoRandomString from 'crypto-random-string' +import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks' +import { db } from '@/db/knex' + +const acquireTaskLock = acquireTaskLockFactory({ db }) +const scheduleExecution = scheduleExecutionFactory({ acquireTaskLock }) describe('Scheduled tasks @core', () => { describe('Task lock repository', () => { diff --git a/packages/server/modules/webhooks/index.ts b/packages/server/modules/webhooks/index.ts index 5eb432d18..ab9f214f6 100644 --- a/packages/server/modules/webhooks/index.ts +++ b/packages/server/modules/webhooks/index.ts @@ -1,10 +1,16 @@ import cron from 'node-cron' import { SpeckleModule } from '@/modules/shared/helpers/typeHelper' -import { scheduleExecution } from '@/modules/core/services/taskScheduler' import { cleanOrphanedWebhookConfigs } from '@/modules/webhooks/services/cleanup' import { activitiesLogger, moduleLogger } from '@/logging/logging' +import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' +import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks' +import { db } from '@/db/knex' const scheduleWebhookCleanup = () => { + const scheduleExecution = scheduleExecutionFactory({ + acquireTaskLock: acquireTaskLockFactory({ db }) + }) + const cronExpression = '0 4 * * 1' return scheduleExecution(cronExpression, 'weeklyWebhookCleanup', async () => { activitiesLogger.info('Starting weekly webhooks cleanup')