Merge pull request #3256 from specklesystems/fabians/core-ioc-43

chore(server): core IoC #43 - scheduleExecutionFactory
This commit is contained in:
Alessandro Magionami
2024-10-14 11:22:18 +02:00
committed by GitHub
7 changed files with 83 additions and 42 deletions
@@ -1,7 +1,6 @@
import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { initializeEventListenerFactory } from '@/modules/activitystream/services/eventListener'
import { publishNotification } from '@/modules/notifications/services/publication'
import { scheduleExecution } from '@/modules/core/services/taskScheduler'
import { activitiesLogger, moduleLogger } from '@/logging/logging'
import { weeklyEmailDigestEnabled } from '@/modules/shared/helpers/envHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
@@ -19,8 +18,11 @@ import {
addStreamAccessRequestDeclinedActivityFactory,
addStreamAccessRequestedActivityFactory
} from '@/modules/activitystream/services/accessRequestActivity'
import { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks'
let scheduledTask: ReturnType<typeof scheduleExecution> | null = null
let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<ReturnType<typeof initializeEventListeners>> =
undefined
@@ -41,6 +43,10 @@ const initializeEventListeners = () => {
}
const scheduleWeeklyActivityNotifications = () => {
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db })
})
// just to test stuff
// every 1000 seconds
// const cronExpression = '*/1000 * * * * *'
@@ -0,0 +1,13 @@
import { ScheduledTask } from '@/modules/core/domain/scheduledTasks/types'
import cron from 'node-cron'
export type AcquireTaskLock = (
scheduledTask: ScheduledTask
) => Promise<ScheduledTask | null>
export type ScheduleExecution = (
cronExpression: string,
taskName: string,
callback: (scheduledTime: Date) => Promise<void>,
lockTimeout?: number
) => cron.ScheduledTask
@@ -0,0 +1,3 @@
import { ScheduledTaskRecord } from '@/modules/core/helpers/types'
export type ScheduledTask = ScheduledTaskRecord
@@ -1,15 +1,22 @@
import { ScheduledTasks } from '@/modules/core/dbSchema'
import { AcquireTaskLock } from '@/modules/core/domain/scheduledTasks/operations'
import { ScheduledTaskRecord } from '@/modules/core/helpers/types'
import { Knex } from 'knex'
export async function acquireTaskLock(
scheduledTask: ScheduledTaskRecord
): Promise<ScheduledTaskRecord | null> {
const now = new Date()
const [lock] = await ScheduledTasks.knex<ScheduledTaskRecord>()
.insert(scheduledTask)
.onConflict(ScheduledTasks.withoutTablePrefix.col.taskName)
.merge()
.where(ScheduledTasks.col.lockExpiresAt, '<', now)
.returning('*')
return (lock as ScheduledTaskRecord) ?? null
const tables = {
scheduledTasks: (db: Knex) => db<ScheduledTaskRecord>(ScheduledTasks.name)
}
export const acquireTaskLockFactory =
(deps: { db: Knex }): AcquireTaskLock =>
async (scheduledTask: ScheduledTaskRecord): Promise<ScheduledTaskRecord | null> => {
const now = new Date()
const [lock] = await tables
.scheduledTasks(deps.db)
.insert(scheduledTask)
.onConflict(ScheduledTasks.withoutTablePrefix.col.taskName)
.merge()
.where(ScheduledTasks.col.lockExpiresAt, '<', now)
.returning('*')
return (lock as ScheduledTaskRecord) ?? null
}
@@ -1,18 +1,18 @@
import cron from 'node-cron'
import { InvalidArgumentError } from '@/modules/shared/errors'
import { ensureError } from '@/modules/shared/helpers/errorHelper'
import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks'
import { ScheduledTaskRecord } from '@/modules/core/helpers/types'
import { activitiesLogger } from '@/logging/logging'
import {
AcquireTaskLock,
ScheduleExecution
} from '@/modules/core/domain/scheduledTasks/operations'
export const scheduledCallbackWrapper = async (
scheduledTime: Date,
taskName: string,
lockTimeout: number,
callback: (scheduledTime: Date) => Promise<void>,
acquireLock: (
scheduledTask: ScheduledTaskRecord
) => Promise<ScheduledTaskRecord | null>
acquireLock: AcquireTaskLock
) => {
const boundLogger = activitiesLogger.child({ taskName })
// try to acquire the task lock with the function name and a new expiration date
@@ -48,24 +48,26 @@ export const scheduledCallbackWrapper = async (
}
}
export const scheduleExecution = (
cronExpression: string,
taskName: string,
callback: (scheduledTime: Date) => Promise<void>,
lockTimeout = 60 * 1000
): cron.ScheduledTask => {
const expressionValid = cron.validate(cronExpression)
if (!expressionValid)
throw new InvalidArgumentError(
`The given cron expression ${cronExpression} is not valid`
)
return cron.schedule(cronExpression, async (scheduledTime: Date) => {
await scheduledCallbackWrapper(
scheduledTime,
taskName,
lockTimeout,
callback,
acquireTaskLock
)
})
}
export const scheduleExecutionFactory =
(deps: { acquireTaskLock: AcquireTaskLock }): ScheduleExecution =>
(
cronExpression: string,
taskName: string,
callback: (scheduledTime: Date) => Promise<void>,
lockTimeout = 60 * 1000
): cron.ScheduledTask => {
const expressionValid = cron.validate(cronExpression)
if (!expressionValid)
throw new InvalidArgumentError(
`The given cron expression ${cronExpression} is not valid`
)
return cron.schedule(cronExpression, async (scheduledTime: Date) => {
await scheduledCallbackWrapper(
scheduledTime,
taskName,
lockTimeout,
callback,
deps.acquireTaskLock
)
})
}
@@ -1,15 +1,19 @@
import { describe } from 'mocha'
import { ScheduledTasks } from '@/modules/core/dbSchema'
import { truncateTables } from '@/test/hooks'
import { acquireTaskLock } from '@/modules/core/repositories/scheduledTasks'
import { ensureError } from '@/modules/shared/helpers/errorHelper'
import {
scheduledCallbackWrapper,
scheduleExecution
scheduleExecutionFactory
} from '@/modules/core/services/taskScheduler'
import { expect } from 'chai'
import { sleep } from '@/test/helpers'
import cryptoRandomString from 'crypto-random-string'
import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks'
import { db } from '@/db/knex'
const acquireTaskLock = acquireTaskLockFactory({ db })
const scheduleExecution = scheduleExecutionFactory({ acquireTaskLock })
describe('Scheduled tasks @core', () => {
describe('Task lock repository', () => {
+7 -1
View File
@@ -1,10 +1,16 @@
import cron from 'node-cron'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { scheduleExecution } from '@/modules/core/services/taskScheduler'
import { cleanOrphanedWebhookConfigs } from '@/modules/webhooks/services/cleanup'
import { activitiesLogger, moduleLogger } from '@/logging/logging'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import { acquireTaskLockFactory } from '@/modules/core/repositories/scheduledTasks'
import { db } from '@/db/knex'
const scheduleWebhookCleanup = () => {
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db })
})
const cronExpression = '0 4 * * 1'
return scheduleExecution(cronExpression, 'weeklyWebhookCleanup', async () => {
activitiesLogger.info('Starting weekly webhooks cleanup')