From 8df7eb714b6b4b6ed9fda752ea622bac3b923a91 Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Wed, 15 Jan 2025 11:00:33 +0200 Subject: [PATCH] chore(server): getting rid of module-scoped eventBuses - batch #5 - automations & runs [final] (#3818) * chore(server): getting rid of module-scoped eventBuses - batch #4 - comments * chore(server): getting rid of module-scoped eventBuses - batch #5 - automations * chore(server): getting rid of module-scoped eventBuses - batch #5 - automation runs --- .../server/modules/automate/domain/events.ts | 52 +++++++++++ .../modules/automate/events/automations.ts | 32 ------- .../server/modules/automate/events/runs.ts | 41 --------- .../automate/graph/resolvers/automate.ts | 14 ++- packages/server/modules/automate/index.ts | 88 ++++++++++--------- .../automate/services/automationManagement.ts | 66 ++++++++------ .../automate/services/runsManagement.ts | 23 ++--- .../modules/automate/services/trigger.ts | 25 +++--- .../automate/tests/automations.spec.ts | 35 +++++++- .../modules/automate/tests/trigger.spec.ts | 36 ++++++-- .../modules/shared/services/eventBus.ts | 8 ++ .../services/moduleEventEmitterSetup.ts | 81 ----------------- .../test/speckle-helpers/automationHelper.ts | 6 +- 13 files changed, 238 insertions(+), 269 deletions(-) create mode 100644 packages/server/modules/automate/domain/events.ts delete mode 100644 packages/server/modules/automate/events/automations.ts delete mode 100644 packages/server/modules/automate/events/runs.ts delete mode 100644 packages/server/modules/shared/services/moduleEventEmitterSetup.ts diff --git a/packages/server/modules/automate/domain/events.ts b/packages/server/modules/automate/domain/events.ts new file mode 100644 index 000000000..c0bda159a --- /dev/null +++ b/packages/server/modules/automate/domain/events.ts @@ -0,0 +1,52 @@ +import { + AutomationFunctionRunRecord, + AutomationRecord, + AutomationRunRecord, + AutomationTriggerType, + AutomationWithRevision, + BaseTriggerManifest, + RunTriggerSource +} from '@/modules/automate/helpers/types' +import { + InsertableAutomationRun, + StoredInsertableAutomationRevision +} from '@/modules/automate/repositories/automations' + +export const automationEventsNamespace = 'automations' as const +export const automationRunEventsNamespace = 'automationRuns' as const + +export const AutomationEvents = { + Created: `${automationEventsNamespace}.created`, + Updated: `${automationEventsNamespace}.updated`, + CreatedRevision: `${automationEventsNamespace}.created-revision` +} as const + +export const AutomationRunEvents = { + Created: `${automationRunEventsNamespace}.created`, + StatusUpdated: `${automationRunEventsNamespace}.status-updated` +} as const + +export type AutomationEventsPayloads = { + [AutomationEvents.Created]: { automation: AutomationRecord } + [AutomationEvents.Updated]: { automation: AutomationRecord } + [AutomationEvents.CreatedRevision]: { + automation: AutomationRecord + revision: StoredInsertableAutomationRevision + } +} + +export type AutomationRunEventsPayloads = { + [AutomationRunEvents.Created]: { + automation: AutomationWithRevision + run: InsertableAutomationRun + manifests: BaseTriggerManifest[] + source: RunTriggerSource + triggerType: AutomationTriggerType + } + [AutomationRunEvents.StatusUpdated]: { + run: AutomationRunRecord + functionRun: AutomationFunctionRunRecord + automationId: string + projectId: string + } +} diff --git a/packages/server/modules/automate/events/automations.ts b/packages/server/modules/automate/events/automations.ts deleted file mode 100644 index 73eac53b5..000000000 --- a/packages/server/modules/automate/events/automations.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { AutomationRecord } from '@/modules/automate/helpers/types' -import { StoredInsertableAutomationRevision } from '@/modules/automate/repositories/automations' -import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup' - -export enum AutomationsEvents { - Created = 'created', - Updated = 'updated', - CreatedRevision = 'created-revision' -} - -export type AutomationsEventsPayloads = { - [AutomationsEvents.Created]: { - automation: AutomationRecord - } - [AutomationsEvents.Updated]: { - automation: AutomationRecord - } - [AutomationsEvents.CreatedRevision]: { - automation: AutomationRecord - revision: StoredInsertableAutomationRevision - } -} - -const { emit, listen } = initializeModuleEventEmitter({ - moduleName: 'automate', - namespace: 'runs' -}) - -export const AutomationsEmitter = { emit, listen, events: AutomationsEvents } - -export type AutomationsEventsEmit = (typeof AutomationsEmitter)['emit'] -export type AutomationsEventsListen = (typeof AutomationsEmitter)['listen'] diff --git a/packages/server/modules/automate/events/runs.ts b/packages/server/modules/automate/events/runs.ts deleted file mode 100644 index 76d61041f..000000000 --- a/packages/server/modules/automate/events/runs.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { - AutomationFunctionRunRecord, - AutomationRunRecord, - AutomationTriggerType, - AutomationWithRevision, - BaseTriggerManifest, - RunTriggerSource -} from '@/modules/automate/helpers/types' -import { InsertableAutomationRun } from '@/modules/automate/repositories/automations' -import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup' - -export enum AutomateRunsEvents { - Created = 'created', - StatusUpdated = 'status-updated' -} - -export type AutomateEventsPayloads = { - [AutomateRunsEvents.Created]: { - automation: AutomationWithRevision - run: InsertableAutomationRun - manifests: BaseTriggerManifest[] - source: RunTriggerSource - triggerType: AutomationTriggerType - } - [AutomateRunsEvents.StatusUpdated]: { - run: AutomationRunRecord - functionRun: AutomationFunctionRunRecord - automationId: string - projectId: string - } -} - -const { emit, listen } = initializeModuleEventEmitter({ - moduleName: 'automate', - namespace: 'runs' -}) - -export const AutomateRunsEmitter = { emit, listen, events: AutomateRunsEvents } - -export type AutomateRunsEventsEmitter = (typeof AutomateRunsEmitter)['emit'] -export type AutomateRunsEventsListener = (typeof AutomateRunsEmitter)['listen'] diff --git a/packages/server/modules/automate/graph/resolvers/automate.ts b/packages/server/modules/automate/graph/resolvers/automate.ts index 9d79aad21..ee7344520 100644 --- a/packages/server/modules/automate/graph/resolvers/automate.ts +++ b/packages/server/modules/automate/graph/resolvers/automate.ts @@ -112,8 +112,6 @@ import { ExecutionEngineNetworkError } from '@/modules/automate/errors/executionEngine' import { db } from '@/db/knex' -import { AutomationsEmitter } from '@/modules/automate/events/automations' -import { AutomateRunsEmitter } from '@/modules/automate/events/runs' import { getCommitFactory } from '@/modules/core/repositories/commits' import { validateStreamAccessFactory } from '@/modules/core/services/streams/access' import { getUserFactory } from '@/modules/core/repositories/users' @@ -574,7 +572,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED storeAutomation: storeAutomationFactory({ db: projectDb }), storeAutomationToken: storeAutomationTokenFactory({ db: projectDb }), validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit + eventEmit: getEventBus().emit }) return ( @@ -593,7 +591,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED getAutomation: getAutomationFactory({ db: projectDb }), updateAutomation: updateAutomationFactory({ db: projectDb }), validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit + eventEmit: getEventBus().emit }) return await update({ @@ -616,7 +614,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED buildDecryptor }), getFunctionReleases, - automationsEventsEmit: AutomationsEmitter.emit, + eventEmit: getEventBus().emit, validateStreamAccess }) @@ -643,7 +641,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED buildDecryptor }), createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken: getAutomationTokenFactory({ db: projectDb }), upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }), getFullAutomationRevisionMetadata: @@ -672,7 +670,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED storeAutomation: storeAutomationFactory({ db: projectDb }), storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }), validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit + eventEmit: getEventBus().emit }) return await create({ @@ -900,7 +898,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED automationRunUpdater: updateAutomationRunFactory({ db: projectDb }), - runEventEmit: AutomateRunsEmitter.emit + emitEvent: getEventBus().emit } const payload = { diff --git a/packages/server/modules/automate/index.ts b/packages/server/modules/automate/index.ts index 2ca3e4ae9..c8a79fa83 100644 --- a/packages/server/modules/automate/index.ts +++ b/packages/server/modules/automate/index.ts @@ -28,12 +28,7 @@ import authGithubAppRest from '@/modules/automate/rest/authGithubApp' import { getFeatureFlags } from '@/modules/shared/helpers/envHelper' import { TokenScopeData } from '@/modules/shared/domain/rolesAndScopes/types' import { db } from '@/db/knex' -import { - AutomationsEmitter, - AutomationsEvents -} from '@/modules/automate/events/automations' import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions' -import { AutomateRunsEmitter, AutomateRunsEvents } from '@/modules/automate/events/runs' import { getBranchLatestCommitsFactory } from '@/modules/core/repositories/branches' import { getCommitFactory } from '@/modules/core/repositories/commits' import { legacyGetUserFactory } from '@/modules/core/repositories/users' @@ -59,6 +54,7 @@ import { mixpanel } from '@/modules/shared/utils/mixpanel' import { getProjectFactory } from '@/modules/core/repositories/projects' import { getEventBus } from '@/modules/shared/services/eventBus' import { VersionEvents } from '@/modules/core/domain/commits/events' +import { AutomationEvents, AutomationRunEvents } from '@/modules/automate/domain/events' const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags() let quitListeners: Optional<() => void> = undefined @@ -115,7 +111,7 @@ const initializeEventListeners = () => { buildDecryptor }), createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken: getAutomationTokenFactory({ db: projectDb }), upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }), getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory( @@ -128,31 +124,37 @@ const initializeEventListeners = () => { } ), // Automation management events - AutomationsEmitter.listen(AutomationsEvents.Created, async ({ automation }) => { - await publish(ProjectSubscriptions.ProjectAutomationsUpdated, { - projectId: automation.projectId, - projectAutomationsUpdated: { - type: ProjectAutomationsUpdatedMessageType.Created, - automationId: automation.id, - automation, - revision: null - } - }) - }), - AutomationsEmitter.listen(AutomationsEvents.Updated, async ({ automation }) => { - await publish(ProjectSubscriptions.ProjectAutomationsUpdated, { - projectId: automation.projectId, - projectAutomationsUpdated: { - type: ProjectAutomationsUpdatedMessageType.Updated, - automationId: automation.id, - automation, - revision: null - } - }) - }), - AutomationsEmitter.listen( - AutomationsEvents.CreatedRevision, - async ({ automation, revision }) => { + getEventBus().listen( + AutomationEvents.Created, + async ({ payload: { automation } }) => { + await publish(ProjectSubscriptions.ProjectAutomationsUpdated, { + projectId: automation.projectId, + projectAutomationsUpdated: { + type: ProjectAutomationsUpdatedMessageType.Created, + automationId: automation.id, + automation, + revision: null + } + }) + } + ), + getEventBus().listen( + AutomationEvents.Updated, + async ({ payload: { automation } }) => { + await publish(ProjectSubscriptions.ProjectAutomationsUpdated, { + projectId: automation.projectId, + projectAutomationsUpdated: { + type: ProjectAutomationsUpdatedMessageType.Updated, + automationId: automation.id, + automation, + revision: null + } + }) + } + ), + getEventBus().listen( + AutomationEvents.CreatedRevision, + async ({ payload: { automation, revision } }) => { await publish(ProjectSubscriptions.ProjectAutomationsUpdated, { projectId: automation.projectId, projectAutomationsUpdated: { @@ -168,9 +170,9 @@ const initializeEventListeners = () => { } ), // Automation run lifecycle events - AutomateRunsEmitter.listen( - AutomateRunsEvents.Created, - async ({ manifests, run, automation }) => { + getEventBus().listen( + AutomationRunEvents.Created, + async ({ payload: { manifests, run, automation } }) => { const validatedManifests = manifests .map((manifest) => { if (isVersionCreatedTriggerManifest(manifest)) { @@ -214,9 +216,9 @@ const initializeEventListeners = () => { ) } ), - AutomateRunsEmitter.listen( - AutomateRunsEvents.StatusUpdated, - async ({ run, functionRun, automationId, projectId }) => { + getEventBus().listen( + AutomationRunEvents.StatusUpdated, + async ({ payload: { run, functionRun, automationId, projectId } }) => { const projectDb = await getProjectDbClient({ projectId }) const triggers = await getAutomationRunFullTriggersFactory({ db: projectDb })({ @@ -253,9 +255,9 @@ const initializeEventListeners = () => { } ), // Mixpanel events - AutomateRunsEmitter.listen( - AutomateRunsEvents.StatusUpdated, - async ({ run, functionRun, automationId, projectId }) => { + getEventBus().listen( + AutomationRunEvents.StatusUpdated, + async ({ payload: { run, functionRun, automationId, projectId } }) => { if (!isFinished(run.status)) return const projectDb = await getProjectDbClient({ projectId }) @@ -301,9 +303,9 @@ const initializeEventListeners = () => { }) } ), - AutomateRunsEmitter.listen( - AutomateRunsEvents.Created, - async ({ automation, run: automationRun, source, manifests }) => { + getEventBus().listen( + AutomationRunEvents.Created, + async ({ payload: { automation, run: automationRun, source, manifests } }) => { const manifest = manifests.at(0) if (!manifest || !isVersionCreatedTriggerManifest(manifest)) { automateLogger.error('Unexpected run trigger manifest type', { diff --git a/packages/server/modules/automate/services/automationManagement.ts b/packages/server/modules/automate/services/automationManagement.ts index 85753daac..14a4c9110 100644 --- a/packages/server/modules/automate/services/automationManagement.ts +++ b/packages/server/modules/automate/services/automationManagement.ts @@ -38,10 +38,6 @@ import { TriggeredAutomationsStatusGraphQLReturn } from '@/modules/automate/help import { FunctionInputDecryptor } from '@/modules/automate/services/encryption' import { LibsodiumEncryptionError } from '@/modules/shared/errors/encryption' import { validateInputAgainstFunctionSchema } from '@/modules/automate/utils/inputSchemaValidator' -import { - AutomationsEmitter, - AutomationsEventsEmit -} from '@/modules/automate/events/automations' import { validateAutomationName } from '@/modules/automate/utils/automationConfigurationValidator' import { CreateAutomation, @@ -56,6 +52,8 @@ import { } from '@/modules/automate/domain/operations' import { GetBranchesByIds } from '@/modules/core/domain/branches/operations' import { ValidateStreamAccess } from '@/modules/core/domain/streams/operations' +import { EventBusEmit } from '@/modules/shared/services/eventBus' +import { AutomationEvents } from '@/modules/automate/domain/events' export type CreateAutomationDeps = { createAuthCode: CreateStoredAuthCode @@ -63,7 +61,7 @@ export type CreateAutomationDeps = { storeAutomation: StoreAutomation storeAutomationToken: StoreAutomationToken validateStreamAccess: ValidateStreamAccess - automationsEventsEmit: AutomationsEventsEmit + eventEmit: EventBusEmit } export const createAutomationFactory = @@ -86,7 +84,7 @@ export const createAutomationFactory = storeAutomation, storeAutomationToken, validateStreamAccess, - automationsEventsEmit + eventEmit } = deps validateAutomationName(name) @@ -129,8 +127,11 @@ export const createAutomationFactory = automateToken: token }) - await automationsEventsEmit(AutomationsEmitter.events.Created, { - automation: automationRecord + await eventEmit({ + eventName: AutomationEvents.Created, + payload: { + automation: automationRecord + } }) return { automation: automationRecord, token: automationTokenRecord } @@ -142,7 +143,7 @@ export type CreateTestAutomationDeps = { storeAutomation: StoreAutomation storeAutomationRevision: StoreAutomationRevision validateStreamAccess: ValidateStreamAccess - automationsEventsEmit: AutomationsEventsEmit + eventEmit: EventBusEmit } /** @@ -169,7 +170,7 @@ export const createTestAutomationFactory = storeAutomation, storeAutomationRevision, validateStreamAccess, - automationsEventsEmit + eventEmit } = deps validateAutomationName(name) @@ -208,8 +209,11 @@ export const createTestAutomationFactory = isTestAutomation: true }) - await AutomationsEmitter.emit(AutomationsEmitter.events.Created, { - automation: automationRecord + await eventEmit({ + eventName: AutomationEvents.Created, + payload: { + automation: automationRecord + } }) // Create and store the automation revision @@ -235,9 +239,12 @@ export const createTestAutomationFactory = publicKey: encryptionKeyPair.publicKey }) - await automationsEventsEmit(AutomationsEmitter.events.CreatedRevision, { - automation: automationRecord, - revision: automationRevisionRecord + await eventEmit({ + eventName: AutomationEvents.CreatedRevision, + payload: { + automation: automationRecord, + revision: automationRevisionRecord + } }) return automationRecord @@ -247,7 +254,7 @@ export type ValidateAndUpdateAutomationDeps = { getAutomation: GetAutomation updateAutomation: UpdateAutomation validateStreamAccess: ValidateStreamAccess - automationsEventsEmit: AutomationsEventsEmit + eventEmit: EventBusEmit } export const validateAndUpdateAutomationFactory = @@ -261,12 +268,7 @@ export const validateAndUpdateAutomationFactory = */ projectId?: string }) => { - const { - getAutomation, - updateAutomation, - validateStreamAccess, - automationsEventsEmit - } = deps + const { getAutomation, updateAutomation, validateStreamAccess, eventEmit } = deps const { input, userId, userResourceAccessRules, projectId } = params const existingAutomation = await getAutomation({ @@ -297,8 +299,11 @@ export const validateAndUpdateAutomationFactory = id: input.id }) - await automationsEventsEmit(AutomationsEmitter.events.Updated, { - automation: res + await eventEmit({ + eventName: AutomationEvents.Updated, + payload: { + automation: res + } }) return res @@ -396,7 +401,7 @@ export type CreateAutomationRevisionDeps = { getFunctionInputDecryptor: FunctionInputDecryptor getFunctionReleases: typeof getFunctionReleases validateStreamAccess: ValidateStreamAccess - automationsEventsEmit: AutomationsEventsEmit + eventEmit: EventBusEmit } & ValidateNewTriggerDefinitionsDeps & ValidateNewRevisionFunctionsDeps @@ -416,7 +421,7 @@ export const createAutomationRevisionFactory = getFunctionInputDecryptor, getFunctionReleases, validateStreamAccess, - automationsEventsEmit + eventEmit } = deps const existingAutomation = await getAutomation({ @@ -538,9 +543,12 @@ export const createAutomationRevisionFactory = } const res = await storeAutomationRevision(revisionInput) - await automationsEventsEmit(AutomationsEmitter.events.CreatedRevision, { - automation: existingAutomation, - revision: res + await eventEmit({ + eventName: AutomationEvents.CreatedRevision, + payload: { + automation: existingAutomation, + revision: res + } }) return res diff --git a/packages/server/modules/automate/services/runsManagement.ts b/packages/server/modules/automate/services/runsManagement.ts index 3e6827717..32ce63362 100644 --- a/packages/server/modules/automate/services/runsManagement.ts +++ b/packages/server/modules/automate/services/runsManagement.ts @@ -1,3 +1,4 @@ +import { AutomationRunEvents } from '@/modules/automate/domain/events' import { GetFunctionRun, UpdateAutomationRun, @@ -7,15 +8,12 @@ import { FunctionRunReportStatusError, FunctionRunNotFoundError } from '@/modules/automate/errors/runs' -import { - AutomateRunsEmitter, - AutomateRunsEventsEmitter -} from '@/modules/automate/events/runs' import { AutomationFunctionRunRecord, AutomationRunStatus, AutomationRunStatuses } from '@/modules/automate/helpers/types' +import { EventBusEmit } from '@/modules/shared/services/eventBus' import { Automate } from '@speckle/shared' const AutomationRunStatusOrder: { [key in AutomationRunStatus]: number } = { @@ -91,7 +89,7 @@ export type ReportFunctionRunStatusDeps = { getAutomationFunctionRunRecord: GetFunctionRun upsertAutomationFunctionRunRecord: UpsertAutomationFunctionRun automationRunUpdater: UpdateAutomationRun - runEventEmit: AutomateRunsEventsEmitter + emitEvent: EventBusEmit } export const reportFunctionRunStatusFactory = @@ -106,7 +104,7 @@ export const reportFunctionRunStatusFactory = getAutomationFunctionRunRecord, upsertAutomationFunctionRunRecord, automationRunUpdater, - runEventEmit + emitEvent } = deps const { projectId, runId, ...statusReportData } = params @@ -147,11 +145,14 @@ export const reportFunctionRunStatusFactory = updatedAt: new Date() }) - await runEventEmit(AutomateRunsEmitter.events.StatusUpdated, { - run: updatedRun, - functionRun: nextFunctionRunRecord, - automationId, - projectId + await emitEvent({ + eventName: AutomationRunEvents.StatusUpdated, + payload: { + run: updatedRun, + functionRun: nextFunctionRunRecord, + automationId, + projectId + } }) return true diff --git a/packages/server/modules/automate/services/trigger.ts b/packages/server/modules/automate/services/trigger.ts index 8d72e5fe9..855e0dfe3 100644 --- a/packages/server/modules/automate/services/trigger.ts +++ b/packages/server/modules/automate/services/trigger.ts @@ -28,10 +28,6 @@ import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/grap import { automateLogger } from '@/logging/logging' import { FunctionInputDecryptor } from '@/modules/automate/services/encryption' import { LibsodiumEncryptionError } from '@/modules/shared/errors/encryption' -import { - AutomateRunsEmitter, - AutomateRunsEventsEmitter -} from '@/modules/automate/events/runs' import { GetActiveTriggerDefinitions, GetAutomation, @@ -48,6 +44,8 @@ import { GetBranchLatestCommits } from '@/modules/core/domain/branches/operation import { GetCommit } from '@/modules/core/domain/commits/operations' import { ValidateStreamAccess } from '@/modules/core/domain/streams/operations' import { CreateAndStoreAppToken } from '@/modules/core/domain/tokens/operations' +import { EventBusEmit } from '@/modules/shared/services/eventBus' +import { AutomationRunEvents } from '@/modules/automate/domain/events' export type OnModelVersionCreateDeps = { getAutomation: GetAutomation @@ -214,7 +212,7 @@ export type TriggerAutomationRevisionRunDeps = { getAutomationToken: GetAutomationToken createAppToken: CreateAndStoreAppToken upsertAutomationRun: UpsertAutomationRun - automateRunsEmitter: AutomateRunsEventsEmitter + emitEvent: EventBusEmit getFullAutomationRevisionMetadata: GetFullAutomationRevisionMetadata getCommit: GetCommit } & CreateAutomationRunDataDeps & @@ -235,7 +233,7 @@ export const triggerAutomationRevisionRunFactory = getAutomationToken, createAppToken, upsertAutomationRun, - automateRunsEmitter, + emitEvent, getFullAutomationRevisionMetadata, getCommit } = deps @@ -316,12 +314,15 @@ export const triggerAutomationRevisionRunFactory = await upsertAutomationRun(automationRun) } - await automateRunsEmitter(AutomateRunsEmitter.events.Created, { - run: automationRun, - manifests: triggerManifests, - automation: automationWithRevision, - source, - triggerType: manifest.triggerType + await emitEvent({ + eventName: AutomationRunEvents.Created, + payload: { + run: automationRun, + manifests: triggerManifests, + automation: automationWithRevision, + source, + triggerType: manifest.triggerType + } }) return { automationRunId: automationRun.id } diff --git a/packages/server/modules/automate/tests/automations.spec.ts b/packages/server/modules/automate/tests/automations.spec.ts index c5edd6bb5..cb916dcd0 100644 --- a/packages/server/modules/automate/tests/automations.spec.ts +++ b/packages/server/modules/automate/tests/automations.spec.ts @@ -42,7 +42,6 @@ import { expect } from 'chai' import { times } from 'lodash' import { getFeatureFlags } from '@/modules/shared/helpers/envHelper' import { db } from '@/db/knex' -import { AutomationsEmitter } from '@/modules/automate/events/automations' import { addOrUpdateStreamCollaboratorFactory, validateStreamAccessFactory @@ -56,6 +55,8 @@ import { import { saveActivityFactory } from '@/modules/activitystream/repositories' import { publish } from '@/modules/shared/utils/subscriptions' import { getUserFactory } from '@/modules/core/repositories/users' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { AutomationEvents } from '@/modules/automate/domain/events' /** * TODO: Extra test ideas @@ -89,7 +90,7 @@ const buildAutomationUpdate = () => { getAutomation, updateAutomation: updateDbAutomation, validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit + eventEmit: getEventBus().emit }) return update @@ -182,10 +183,17 @@ const buildAutomationUpdate = () => { }) it('creates an automation', async () => { + let eventFired = false + const name = 'My Super Automation #1' + + getEventBus().listenOnce(AutomationEvents.Created, async ({ payload }) => { + expect(payload.automation.name).to.equal(name) + eventFired = true + }) const create = buildAutomationCreate() const automation = await create({ - input: { name: 'Automation #1', enabled: true }, + input: { name, enabled: true }, projectId: myStream.id, userId: me.id }) @@ -193,6 +201,8 @@ const buildAutomationUpdate = () => { expect(automation).to.be.ok expect(automation.automation).to.be.ok expect(automation.token).to.be.ok + expect(automation.automation.name).to.equal(name) + expect(eventFired).to.be.true }) }) @@ -264,6 +274,14 @@ const buildAutomationUpdate = () => { userId: me.id }) + let eventFired = false + getEventBus().listenOnce(AutomationEvents.Updated, async ({ payload }) => { + expect(payload.automation.name).to.equal(initAutomation.name) + expect(payload.automation.enabled).to.be.false + expect(payload.automation.id).to.equal(initAutomation.id) + eventFired = true + }) + const updatedAutomation = await update({ input: { id: initAutomation.id, enabled: false }, userId: me.id, @@ -273,6 +291,7 @@ const buildAutomationUpdate = () => { expect(updatedAutomation).to.be.ok expect(updatedAutomation.enabled).to.be.false expect(updatedAutomation.name).to.equal(initAutomation.name) + expect(eventFired).to.be.true }) it('updates all available properties', async () => { @@ -339,6 +358,15 @@ const buildAutomationUpdate = () => { }) it('works successfully', async () => { + let eventFired = false + getEventBus().listenOnce( + AutomationEvents.CreatedRevision, + async ({ payload }) => { + expect(payload.automation.id).to.equal(createdAutomation.automation.id) + expect(payload.revision).to.be.ok + eventFired = true + } + ) const create = buildAutomationRevisionCreate() const ret = await create({ @@ -352,6 +380,7 @@ const buildAutomationUpdate = () => { expect(ret.automationId).to.equal(createdAutomation.automation.id) expect(ret.triggers.length).to.be.ok expect(ret.functions.length).to.be.ok + expect(eventFired).to.be.true }) it('fails if automation does not exist', async () => { diff --git a/packages/server/modules/automate/tests/trigger.spec.ts b/packages/server/modules/automate/tests/trigger.spec.ts index ca83b998a..7db34b8d2 100644 --- a/packages/server/modules/automate/tests/trigger.spec.ts +++ b/packages/server/modules/automate/tests/trigger.spec.ts @@ -74,7 +74,6 @@ import { import { buildDecryptor } from '@/modules/shared/utils/libsodium' import { mapGqlStatusToDbStatus } from '@/modules/automate/utils/automateFunctionRunStatus' import { db } from '@/db/knex' -import { AutomateRunsEmitter } from '@/modules/automate/events/runs' import { getCommitFactory } from '@/modules/core/repositories/commits' import { validateStreamAccessFactory } from '@/modules/core/services/streams/access' import { authorizeResolver } from '@/modules/shared' @@ -85,6 +84,8 @@ import { storeTokenScopesFactory, storeUserServerAppTokenFactory } from '@/modules/core/repositories/tokens' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { AutomationRunEvents } from '@/modules/automate/domain/events' const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags() @@ -336,7 +337,7 @@ const createAppToken = createAppTokenFactory({ }), getEncryptionKeyPairFor, createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken, upsertAutomationRun, getFullAutomationRevisionMetadata, @@ -430,7 +431,7 @@ const createAppToken = createAppTokenFactory({ }), getEncryptionKeyPairFor, createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken, upsertAutomationRun, getFullAutomationRevisionMetadata, @@ -521,6 +522,18 @@ const createAppToken = createAppTokenFactory({ } ] }) + + let eventFired = false + getEventBus().listenOnce( + AutomationRunEvents.Created, + async ({ payload }) => { + expect(payload.automation.id).to.equal(automation.id) + expect(payload.run.automationRevisionId).to.equal(automationRevisionId) + expect(payload.source).to.equal(RunTriggerSource.Manual) + eventFired = true + }, + { timeout: 1000 } + ) const executionEngineRunId = cryptoRandomString({ length: 10 }) const { automationRunId } = await triggerAutomationRevisionRunFactory({ automateRunTrigger: async () => ({ @@ -531,7 +544,7 @@ const createAppToken = createAppTokenFactory({ }), getEncryptionKeyPairFor, createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken, upsertAutomationRun, getFullAutomationRevisionMetadata, @@ -558,6 +571,7 @@ const createAppToken = createAppTokenFactory({ for (const run of storedRun.functionRuns) { expect(run.status).to.equal(expectedStatus) } + expect(eventFired).to.be.true }) }) describe('Run conditions are NOT met if', () => { @@ -1016,7 +1030,7 @@ const createAppToken = createAppTokenFactory({ }), getEncryptionKeyPairFor, createAppToken, - automateRunsEmitter: AutomateRunsEmitter.emit, + emitEvent: getEventBus().emit, getAutomationToken, upsertAutomationRun, getFullAutomationRevisionMetadata, @@ -1242,7 +1256,7 @@ const createAppToken = createAppTokenFactory({ getAutomationFunctionRunRecord: getFunctionRun, upsertAutomationFunctionRunRecord: upsertAutomationFunctionRun, automationRunUpdater: updateAutomationRun, - runEventEmit: AutomateRunsEmitter.emit + emitEvent: getEventBus().emit }) return report @@ -1361,6 +1375,15 @@ const createAppToken = createAppTokenFactory({ projectId: testUserStream.id } + let eventFired = false + getEventBus().listenOnce( + AutomationRunEvents.StatusUpdated, + async ({ payload }) => { + expect(payload.functionRun.id).to.equal(functionRunId) + eventFired = true + }, + { timeout: 1000 } + ) await expect(report(params)).to.eventually.be.true const [updatedRun, updatedFnRun] = await Promise.all([ @@ -1371,6 +1394,7 @@ const createAppToken = createAppTokenFactory({ expect(updatedRun?.status).to.equal(AutomationRunStatuses.succeeded) expect(updatedFnRun?.status).to.equal(AutomationRunStatuses.succeeded) expect(updatedFnRun?.contextView).to.equal(contextView) + expect(eventFired).to.be.true }) }) }) diff --git a/packages/server/modules/shared/services/eventBus.ts b/packages/server/modules/shared/services/eventBus.ts index d2ac81938..9eafc3997 100644 --- a/packages/server/modules/shared/services/eventBus.ts +++ b/packages/server/modules/shared/services/eventBus.ts @@ -39,6 +39,12 @@ import { commentEventsNamespace, CommentEventsPayloads } from '@/modules/comments/domain/events' +import { + automationEventsNamespace, + AutomationEventsPayloads, + automationRunEventsNamespace, + AutomationRunEventsPayloads +} from '@/modules/automate/domain/events' type AllEventsWildcard = '**' type EventWildcard = '*' @@ -65,6 +71,8 @@ type EventsByNamespace = { [versionEventsNamespace]: VersionEventsPayloads [accessRequestEventsNamespace]: AccessRequestEventsPayloads [commentEventsNamespace]: CommentEventsPayloads + [automationEventsNamespace]: AutomationEventsPayloads + [automationRunEventsNamespace]: AutomationRunEventsPayloads } type EventTypes = UnionToIntersection diff --git a/packages/server/modules/shared/services/moduleEventEmitterSetup.ts b/packages/server/modules/shared/services/moduleEventEmitterSetup.ts deleted file mode 100644 index ebf24d309..000000000 --- a/packages/server/modules/shared/services/moduleEventEmitterSetup.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { moduleLogger, Observability } from '@/logging/logging' -import { MaybeAsync } from '@/modules/shared/helpers/typeHelper' -import EventEmitter from 'eventemitter2' - -export type ModuleEventEmitterParams = { - moduleName: string - /** - * If you have multiple emitters in a single module, you can use this identify - * each of them differently - */ - namespace?: string -} - -/** - * Initialize Speckle Module scoped event emitter. These can be used to make code more SOLID - instead of - * modifying some code that does X every time you want to do something extra when X occurs, just emit an event - * there and specify the listening code in a more appropriate module. - * - * Example: Instead of comment mentions being sent out from the comment repository's "createComment" function, - * this repo function emits a COMMENT_CREATED event, that is then handled in a more appropriate module - the speckle - * Notifications module. - */ -export function initializeModuleEventEmitter

>( - params: ModuleEventEmitterParams -) { - const { moduleName, namespace } = params - const identifier = namespace ? `${moduleName}-${namespace}` : moduleName - - const logger = Observability.extendLoggerComponent(moduleLogger, identifier, 'events') - - const errHandler = (e: unknown) => { - logger.error(e, `Unhandled ${identifier} event emitter error`) - } - - const emitter = new EventEmitter() - emitter.on('uncaughtException', errHandler) - emitter.on('error', errHandler) - - return { - /** - * Emit a module event. This function must be awaited to ensure all listeners - * execute. Any errors thrown in the listeners will bubble up and throw from - * the part of code that triggers this emit() call. - */ - emit: async (eventName: K, payload: P[K]) => { - return (await emitter.emitAsync(eventName, payload)) as unknown[] - }, - - /** - * Listen for module events. Any errors thrown here will bubble out of where - * emit() was invoked. - * - * @returns Callback for stopping listening - */ - listen: ( - eventName: K, - handler: (payload: P[K]) => MaybeAsync - ) => { - emitter.on(eventName, handler, { - async: true, - promisify: true - }) - - return () => { - emitter.removeListener(eventName, handler) - } - }, - - /** - * Destroy event emitter - */ - destroy() { - emitter.removeAllListeners() - }, - - /** - * Debugger scoped to this module event emitter - */ - logger - } -} diff --git a/packages/server/test/speckle-helpers/automationHelper.ts b/packages/server/test/speckle-helpers/automationHelper.ts index 81334c311..0e94fb315 100644 --- a/packages/server/test/speckle-helpers/automationHelper.ts +++ b/packages/server/test/speckle-helpers/automationHelper.ts @@ -39,9 +39,9 @@ import { } from '@/modules/automate/services/encryption' import { buildDecryptor } from '@/modules/shared/utils/libsodium' import { db } from '@/db/knex' -import { AutomationsEmitter } from '@/modules/automate/events/automations' import { validateStreamAccessFactory } from '@/modules/core/services/streams/access' import { authorizeResolver } from '@/modules/shared' +import { getEventBus } from '@/modules/shared/services/eventBus' const storeAutomation = storeAutomationFactory({ db }) const storeAutomationToken = storeAutomationTokenFactory({ db }) @@ -69,7 +69,7 @@ export const buildAutomationCreate = ( storeAutomation, storeAutomationToken, validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit + eventEmit: getEventBus().emit }) return create @@ -101,7 +101,7 @@ export const buildAutomationRevisionCreate = ( buildDecryptor }), validateStreamAccess, - automationsEventsEmit: AutomationsEmitter.emit, + eventEmit: getEventBus().emit, ...overrides })