chore(server): getting rid of module-scoped eventBuses - batch #5 - automations & runs [final] (#3818)

* chore(server): getting rid of module-scoped eventBuses - batch #4 - comments

* chore(server): getting rid of module-scoped eventBuses - batch #5 - automations

* chore(server): getting rid of module-scoped eventBuses - batch #5 - automation runs
This commit is contained in:
Kristaps Fabians Geikins
2025-01-15 11:00:33 +02:00
committed by GitHub
parent e349ff32fa
commit 8df7eb714b
13 changed files with 238 additions and 269 deletions
@@ -0,0 +1,52 @@
import {
AutomationFunctionRunRecord,
AutomationRecord,
AutomationRunRecord,
AutomationTriggerType,
AutomationWithRevision,
BaseTriggerManifest,
RunTriggerSource
} from '@/modules/automate/helpers/types'
import {
InsertableAutomationRun,
StoredInsertableAutomationRevision
} from '@/modules/automate/repositories/automations'
export const automationEventsNamespace = 'automations' as const
export const automationRunEventsNamespace = 'automationRuns' as const
export const AutomationEvents = {
Created: `${automationEventsNamespace}.created`,
Updated: `${automationEventsNamespace}.updated`,
CreatedRevision: `${automationEventsNamespace}.created-revision`
} as const
export const AutomationRunEvents = {
Created: `${automationRunEventsNamespace}.created`,
StatusUpdated: `${automationRunEventsNamespace}.status-updated`
} as const
export type AutomationEventsPayloads = {
[AutomationEvents.Created]: { automation: AutomationRecord }
[AutomationEvents.Updated]: { automation: AutomationRecord }
[AutomationEvents.CreatedRevision]: {
automation: AutomationRecord
revision: StoredInsertableAutomationRevision
}
}
export type AutomationRunEventsPayloads = {
[AutomationRunEvents.Created]: {
automation: AutomationWithRevision
run: InsertableAutomationRun
manifests: BaseTriggerManifest[]
source: RunTriggerSource
triggerType: AutomationTriggerType
}
[AutomationRunEvents.StatusUpdated]: {
run: AutomationRunRecord
functionRun: AutomationFunctionRunRecord
automationId: string
projectId: string
}
}
@@ -1,32 +0,0 @@
import { AutomationRecord } from '@/modules/automate/helpers/types'
import { StoredInsertableAutomationRevision } from '@/modules/automate/repositories/automations'
import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup'
export enum AutomationsEvents {
Created = 'created',
Updated = 'updated',
CreatedRevision = 'created-revision'
}
export type AutomationsEventsPayloads = {
[AutomationsEvents.Created]: {
automation: AutomationRecord
}
[AutomationsEvents.Updated]: {
automation: AutomationRecord
}
[AutomationsEvents.CreatedRevision]: {
automation: AutomationRecord
revision: StoredInsertableAutomationRevision
}
}
const { emit, listen } = initializeModuleEventEmitter<AutomationsEventsPayloads>({
moduleName: 'automate',
namespace: 'runs'
})
export const AutomationsEmitter = { emit, listen, events: AutomationsEvents }
export type AutomationsEventsEmit = (typeof AutomationsEmitter)['emit']
export type AutomationsEventsListen = (typeof AutomationsEmitter)['listen']
@@ -1,41 +0,0 @@
import {
AutomationFunctionRunRecord,
AutomationRunRecord,
AutomationTriggerType,
AutomationWithRevision,
BaseTriggerManifest,
RunTriggerSource
} from '@/modules/automate/helpers/types'
import { InsertableAutomationRun } from '@/modules/automate/repositories/automations'
import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup'
export enum AutomateRunsEvents {
Created = 'created',
StatusUpdated = 'status-updated'
}
export type AutomateEventsPayloads = {
[AutomateRunsEvents.Created]: {
automation: AutomationWithRevision
run: InsertableAutomationRun
manifests: BaseTriggerManifest[]
source: RunTriggerSource
triggerType: AutomationTriggerType
}
[AutomateRunsEvents.StatusUpdated]: {
run: AutomationRunRecord
functionRun: AutomationFunctionRunRecord
automationId: string
projectId: string
}
}
const { emit, listen } = initializeModuleEventEmitter<AutomateEventsPayloads>({
moduleName: 'automate',
namespace: 'runs'
})
export const AutomateRunsEmitter = { emit, listen, events: AutomateRunsEvents }
export type AutomateRunsEventsEmitter = (typeof AutomateRunsEmitter)['emit']
export type AutomateRunsEventsListener = (typeof AutomateRunsEmitter)['listen']
@@ -112,8 +112,6 @@ import {
ExecutionEngineNetworkError
} from '@/modules/automate/errors/executionEngine'
import { db } from '@/db/knex'
import { AutomationsEmitter } from '@/modules/automate/events/automations'
import { AutomateRunsEmitter } from '@/modules/automate/events/runs'
import { getCommitFactory } from '@/modules/core/repositories/commits'
import { validateStreamAccessFactory } from '@/modules/core/services/streams/access'
import { getUserFactory } from '@/modules/core/repositories/users'
@@ -574,7 +572,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
storeAutomation: storeAutomationFactory({ db: projectDb }),
storeAutomationToken: storeAutomationTokenFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
eventEmit: getEventBus().emit
})
return (
@@ -593,7 +591,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
getAutomation: getAutomationFactory({ db: projectDb }),
updateAutomation: updateAutomationFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
eventEmit: getEventBus().emit
})
return await update({
@@ -616,7 +614,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
buildDecryptor
}),
getFunctionReleases,
automationsEventsEmit: AutomationsEmitter.emit,
eventEmit: getEventBus().emit,
validateStreamAccess
})
@@ -643,7 +641,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
buildDecryptor
}),
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken: getAutomationTokenFactory({ db: projectDb }),
upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }),
getFullAutomationRevisionMetadata:
@@ -672,7 +670,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
storeAutomation: storeAutomationFactory({ db: projectDb }),
storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
eventEmit: getEventBus().emit
})
return await create({
@@ -900,7 +898,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
automationRunUpdater: updateAutomationRunFactory({
db: projectDb
}),
runEventEmit: AutomateRunsEmitter.emit
emitEvent: getEventBus().emit
}
const payload = {
+45 -43
View File
@@ -28,12 +28,7 @@ import authGithubAppRest from '@/modules/automate/rest/authGithubApp'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { TokenScopeData } from '@/modules/shared/domain/rolesAndScopes/types'
import { db } from '@/db/knex'
import {
AutomationsEmitter,
AutomationsEvents
} from '@/modules/automate/events/automations'
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import { AutomateRunsEmitter, AutomateRunsEvents } from '@/modules/automate/events/runs'
import { getBranchLatestCommitsFactory } from '@/modules/core/repositories/branches'
import { getCommitFactory } from '@/modules/core/repositories/commits'
import { legacyGetUserFactory } from '@/modules/core/repositories/users'
@@ -59,6 +54,7 @@ import { mixpanel } from '@/modules/shared/utils/mixpanel'
import { getProjectFactory } from '@/modules/core/repositories/projects'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { VersionEvents } from '@/modules/core/domain/commits/events'
import { AutomationEvents, AutomationRunEvents } from '@/modules/automate/domain/events'
const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()
let quitListeners: Optional<() => void> = undefined
@@ -115,7 +111,7 @@ const initializeEventListeners = () => {
buildDecryptor
}),
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken: getAutomationTokenFactory({ db: projectDb }),
upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }),
getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory(
@@ -128,31 +124,37 @@ const initializeEventListeners = () => {
}
),
// Automation management events
AutomationsEmitter.listen(AutomationsEvents.Created, async ({ automation }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Created,
automationId: automation.id,
automation,
revision: null
}
})
}),
AutomationsEmitter.listen(AutomationsEvents.Updated, async ({ automation }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Updated,
automationId: automation.id,
automation,
revision: null
}
})
}),
AutomationsEmitter.listen(
AutomationsEvents.CreatedRevision,
async ({ automation, revision }) => {
getEventBus().listen(
AutomationEvents.Created,
async ({ payload: { automation } }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Created,
automationId: automation.id,
automation,
revision: null
}
})
}
),
getEventBus().listen(
AutomationEvents.Updated,
async ({ payload: { automation } }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Updated,
automationId: automation.id,
automation,
revision: null
}
})
}
),
getEventBus().listen(
AutomationEvents.CreatedRevision,
async ({ payload: { automation, revision } }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
@@ -168,9 +170,9 @@ const initializeEventListeners = () => {
}
),
// Automation run lifecycle events
AutomateRunsEmitter.listen(
AutomateRunsEvents.Created,
async ({ manifests, run, automation }) => {
getEventBus().listen(
AutomationRunEvents.Created,
async ({ payload: { manifests, run, automation } }) => {
const validatedManifests = manifests
.map((manifest) => {
if (isVersionCreatedTriggerManifest(manifest)) {
@@ -214,9 +216,9 @@ const initializeEventListeners = () => {
)
}
),
AutomateRunsEmitter.listen(
AutomateRunsEvents.StatusUpdated,
async ({ run, functionRun, automationId, projectId }) => {
getEventBus().listen(
AutomationRunEvents.StatusUpdated,
async ({ payload: { run, functionRun, automationId, projectId } }) => {
const projectDb = await getProjectDbClient({ projectId })
const triggers = await getAutomationRunFullTriggersFactory({ db: projectDb })({
@@ -253,9 +255,9 @@ const initializeEventListeners = () => {
}
),
// Mixpanel events
AutomateRunsEmitter.listen(
AutomateRunsEvents.StatusUpdated,
async ({ run, functionRun, automationId, projectId }) => {
getEventBus().listen(
AutomationRunEvents.StatusUpdated,
async ({ payload: { run, functionRun, automationId, projectId } }) => {
if (!isFinished(run.status)) return
const projectDb = await getProjectDbClient({ projectId })
@@ -301,9 +303,9 @@ const initializeEventListeners = () => {
})
}
),
AutomateRunsEmitter.listen(
AutomateRunsEvents.Created,
async ({ automation, run: automationRun, source, manifests }) => {
getEventBus().listen(
AutomationRunEvents.Created,
async ({ payload: { automation, run: automationRun, source, manifests } }) => {
const manifest = manifests.at(0)
if (!manifest || !isVersionCreatedTriggerManifest(manifest)) {
automateLogger.error('Unexpected run trigger manifest type', {
@@ -38,10 +38,6 @@ import { TriggeredAutomationsStatusGraphQLReturn } from '@/modules/automate/help
import { FunctionInputDecryptor } from '@/modules/automate/services/encryption'
import { LibsodiumEncryptionError } from '@/modules/shared/errors/encryption'
import { validateInputAgainstFunctionSchema } from '@/modules/automate/utils/inputSchemaValidator'
import {
AutomationsEmitter,
AutomationsEventsEmit
} from '@/modules/automate/events/automations'
import { validateAutomationName } from '@/modules/automate/utils/automationConfigurationValidator'
import {
CreateAutomation,
@@ -56,6 +52,8 @@ import {
} from '@/modules/automate/domain/operations'
import { GetBranchesByIds } from '@/modules/core/domain/branches/operations'
import { ValidateStreamAccess } from '@/modules/core/domain/streams/operations'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { AutomationEvents } from '@/modules/automate/domain/events'
export type CreateAutomationDeps = {
createAuthCode: CreateStoredAuthCode
@@ -63,7 +61,7 @@ export type CreateAutomationDeps = {
storeAutomation: StoreAutomation
storeAutomationToken: StoreAutomationToken
validateStreamAccess: ValidateStreamAccess
automationsEventsEmit: AutomationsEventsEmit
eventEmit: EventBusEmit
}
export const createAutomationFactory =
@@ -86,7 +84,7 @@ export const createAutomationFactory =
storeAutomation,
storeAutomationToken,
validateStreamAccess,
automationsEventsEmit
eventEmit
} = deps
validateAutomationName(name)
@@ -129,8 +127,11 @@ export const createAutomationFactory =
automateToken: token
})
await automationsEventsEmit(AutomationsEmitter.events.Created, {
automation: automationRecord
await eventEmit({
eventName: AutomationEvents.Created,
payload: {
automation: automationRecord
}
})
return { automation: automationRecord, token: automationTokenRecord }
@@ -142,7 +143,7 @@ export type CreateTestAutomationDeps = {
storeAutomation: StoreAutomation
storeAutomationRevision: StoreAutomationRevision
validateStreamAccess: ValidateStreamAccess
automationsEventsEmit: AutomationsEventsEmit
eventEmit: EventBusEmit
}
/**
@@ -169,7 +170,7 @@ export const createTestAutomationFactory =
storeAutomation,
storeAutomationRevision,
validateStreamAccess,
automationsEventsEmit
eventEmit
} = deps
validateAutomationName(name)
@@ -208,8 +209,11 @@ export const createTestAutomationFactory =
isTestAutomation: true
})
await AutomationsEmitter.emit(AutomationsEmitter.events.Created, {
automation: automationRecord
await eventEmit({
eventName: AutomationEvents.Created,
payload: {
automation: automationRecord
}
})
// Create and store the automation revision
@@ -235,9 +239,12 @@ export const createTestAutomationFactory =
publicKey: encryptionKeyPair.publicKey
})
await automationsEventsEmit(AutomationsEmitter.events.CreatedRevision, {
automation: automationRecord,
revision: automationRevisionRecord
await eventEmit({
eventName: AutomationEvents.CreatedRevision,
payload: {
automation: automationRecord,
revision: automationRevisionRecord
}
})
return automationRecord
@@ -247,7 +254,7 @@ export type ValidateAndUpdateAutomationDeps = {
getAutomation: GetAutomation
updateAutomation: UpdateAutomation
validateStreamAccess: ValidateStreamAccess
automationsEventsEmit: AutomationsEventsEmit
eventEmit: EventBusEmit
}
export const validateAndUpdateAutomationFactory =
@@ -261,12 +268,7 @@ export const validateAndUpdateAutomationFactory =
*/
projectId?: string
}) => {
const {
getAutomation,
updateAutomation,
validateStreamAccess,
automationsEventsEmit
} = deps
const { getAutomation, updateAutomation, validateStreamAccess, eventEmit } = deps
const { input, userId, userResourceAccessRules, projectId } = params
const existingAutomation = await getAutomation({
@@ -297,8 +299,11 @@ export const validateAndUpdateAutomationFactory =
id: input.id
})
await automationsEventsEmit(AutomationsEmitter.events.Updated, {
automation: res
await eventEmit({
eventName: AutomationEvents.Updated,
payload: {
automation: res
}
})
return res
@@ -396,7 +401,7 @@ export type CreateAutomationRevisionDeps = {
getFunctionInputDecryptor: FunctionInputDecryptor
getFunctionReleases: typeof getFunctionReleases
validateStreamAccess: ValidateStreamAccess
automationsEventsEmit: AutomationsEventsEmit
eventEmit: EventBusEmit
} & ValidateNewTriggerDefinitionsDeps &
ValidateNewRevisionFunctionsDeps
@@ -416,7 +421,7 @@ export const createAutomationRevisionFactory =
getFunctionInputDecryptor,
getFunctionReleases,
validateStreamAccess,
automationsEventsEmit
eventEmit
} = deps
const existingAutomation = await getAutomation({
@@ -538,9 +543,12 @@ export const createAutomationRevisionFactory =
}
const res = await storeAutomationRevision(revisionInput)
await automationsEventsEmit(AutomationsEmitter.events.CreatedRevision, {
automation: existingAutomation,
revision: res
await eventEmit({
eventName: AutomationEvents.CreatedRevision,
payload: {
automation: existingAutomation,
revision: res
}
})
return res
@@ -1,3 +1,4 @@
import { AutomationRunEvents } from '@/modules/automate/domain/events'
import {
GetFunctionRun,
UpdateAutomationRun,
@@ -7,15 +8,12 @@ import {
FunctionRunReportStatusError,
FunctionRunNotFoundError
} from '@/modules/automate/errors/runs'
import {
AutomateRunsEmitter,
AutomateRunsEventsEmitter
} from '@/modules/automate/events/runs'
import {
AutomationFunctionRunRecord,
AutomationRunStatus,
AutomationRunStatuses
} from '@/modules/automate/helpers/types'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { Automate } from '@speckle/shared'
const AutomationRunStatusOrder: { [key in AutomationRunStatus]: number } = {
@@ -91,7 +89,7 @@ export type ReportFunctionRunStatusDeps = {
getAutomationFunctionRunRecord: GetFunctionRun
upsertAutomationFunctionRunRecord: UpsertAutomationFunctionRun
automationRunUpdater: UpdateAutomationRun
runEventEmit: AutomateRunsEventsEmitter
emitEvent: EventBusEmit
}
export const reportFunctionRunStatusFactory =
@@ -106,7 +104,7 @@ export const reportFunctionRunStatusFactory =
getAutomationFunctionRunRecord,
upsertAutomationFunctionRunRecord,
automationRunUpdater,
runEventEmit
emitEvent
} = deps
const { projectId, runId, ...statusReportData } = params
@@ -147,11 +145,14 @@ export const reportFunctionRunStatusFactory =
updatedAt: new Date()
})
await runEventEmit(AutomateRunsEmitter.events.StatusUpdated, {
run: updatedRun,
functionRun: nextFunctionRunRecord,
automationId,
projectId
await emitEvent({
eventName: AutomationRunEvents.StatusUpdated,
payload: {
run: updatedRun,
functionRun: nextFunctionRunRecord,
automationId,
projectId
}
})
return true
@@ -28,10 +28,6 @@ import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/grap
import { automateLogger } from '@/logging/logging'
import { FunctionInputDecryptor } from '@/modules/automate/services/encryption'
import { LibsodiumEncryptionError } from '@/modules/shared/errors/encryption'
import {
AutomateRunsEmitter,
AutomateRunsEventsEmitter
} from '@/modules/automate/events/runs'
import {
GetActiveTriggerDefinitions,
GetAutomation,
@@ -48,6 +44,8 @@ import { GetBranchLatestCommits } from '@/modules/core/domain/branches/operation
import { GetCommit } from '@/modules/core/domain/commits/operations'
import { ValidateStreamAccess } from '@/modules/core/domain/streams/operations'
import { CreateAndStoreAppToken } from '@/modules/core/domain/tokens/operations'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { AutomationRunEvents } from '@/modules/automate/domain/events'
export type OnModelVersionCreateDeps = {
getAutomation: GetAutomation
@@ -214,7 +212,7 @@ export type TriggerAutomationRevisionRunDeps = {
getAutomationToken: GetAutomationToken
createAppToken: CreateAndStoreAppToken
upsertAutomationRun: UpsertAutomationRun
automateRunsEmitter: AutomateRunsEventsEmitter
emitEvent: EventBusEmit
getFullAutomationRevisionMetadata: GetFullAutomationRevisionMetadata
getCommit: GetCommit
} & CreateAutomationRunDataDeps &
@@ -235,7 +233,7 @@ export const triggerAutomationRevisionRunFactory =
getAutomationToken,
createAppToken,
upsertAutomationRun,
automateRunsEmitter,
emitEvent,
getFullAutomationRevisionMetadata,
getCommit
} = deps
@@ -316,12 +314,15 @@ export const triggerAutomationRevisionRunFactory =
await upsertAutomationRun(automationRun)
}
await automateRunsEmitter(AutomateRunsEmitter.events.Created, {
run: automationRun,
manifests: triggerManifests,
automation: automationWithRevision,
source,
triggerType: manifest.triggerType
await emitEvent({
eventName: AutomationRunEvents.Created,
payload: {
run: automationRun,
manifests: triggerManifests,
automation: automationWithRevision,
source,
triggerType: manifest.triggerType
}
})
return { automationRunId: automationRun.id }
@@ -42,7 +42,6 @@ import { expect } from 'chai'
import { times } from 'lodash'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { db } from '@/db/knex'
import { AutomationsEmitter } from '@/modules/automate/events/automations'
import {
addOrUpdateStreamCollaboratorFactory,
validateStreamAccessFactory
@@ -56,6 +55,8 @@ import {
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { publish } from '@/modules/shared/utils/subscriptions'
import { getUserFactory } from '@/modules/core/repositories/users'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { AutomationEvents } from '@/modules/automate/domain/events'
/**
* TODO: Extra test ideas
@@ -89,7 +90,7 @@ const buildAutomationUpdate = () => {
getAutomation,
updateAutomation: updateDbAutomation,
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
eventEmit: getEventBus().emit
})
return update
@@ -182,10 +183,17 @@ const buildAutomationUpdate = () => {
})
it('creates an automation', async () => {
let eventFired = false
const name = 'My Super Automation #1'
getEventBus().listenOnce(AutomationEvents.Created, async ({ payload }) => {
expect(payload.automation.name).to.equal(name)
eventFired = true
})
const create = buildAutomationCreate()
const automation = await create({
input: { name: 'Automation #1', enabled: true },
input: { name, enabled: true },
projectId: myStream.id,
userId: me.id
})
@@ -193,6 +201,8 @@ const buildAutomationUpdate = () => {
expect(automation).to.be.ok
expect(automation.automation).to.be.ok
expect(automation.token).to.be.ok
expect(automation.automation.name).to.equal(name)
expect(eventFired).to.be.true
})
})
@@ -264,6 +274,14 @@ const buildAutomationUpdate = () => {
userId: me.id
})
let eventFired = false
getEventBus().listenOnce(AutomationEvents.Updated, async ({ payload }) => {
expect(payload.automation.name).to.equal(initAutomation.name)
expect(payload.automation.enabled).to.be.false
expect(payload.automation.id).to.equal(initAutomation.id)
eventFired = true
})
const updatedAutomation = await update({
input: { id: initAutomation.id, enabled: false },
userId: me.id,
@@ -273,6 +291,7 @@ const buildAutomationUpdate = () => {
expect(updatedAutomation).to.be.ok
expect(updatedAutomation.enabled).to.be.false
expect(updatedAutomation.name).to.equal(initAutomation.name)
expect(eventFired).to.be.true
})
it('updates all available properties', async () => {
@@ -339,6 +358,15 @@ const buildAutomationUpdate = () => {
})
it('works successfully', async () => {
let eventFired = false
getEventBus().listenOnce(
AutomationEvents.CreatedRevision,
async ({ payload }) => {
expect(payload.automation.id).to.equal(createdAutomation.automation.id)
expect(payload.revision).to.be.ok
eventFired = true
}
)
const create = buildAutomationRevisionCreate()
const ret = await create({
@@ -352,6 +380,7 @@ const buildAutomationUpdate = () => {
expect(ret.automationId).to.equal(createdAutomation.automation.id)
expect(ret.triggers.length).to.be.ok
expect(ret.functions.length).to.be.ok
expect(eventFired).to.be.true
})
it('fails if automation does not exist', async () => {
@@ -74,7 +74,6 @@ import {
import { buildDecryptor } from '@/modules/shared/utils/libsodium'
import { mapGqlStatusToDbStatus } from '@/modules/automate/utils/automateFunctionRunStatus'
import { db } from '@/db/knex'
import { AutomateRunsEmitter } from '@/modules/automate/events/runs'
import { getCommitFactory } from '@/modules/core/repositories/commits'
import { validateStreamAccessFactory } from '@/modules/core/services/streams/access'
import { authorizeResolver } from '@/modules/shared'
@@ -85,6 +84,8 @@ import {
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { AutomationRunEvents } from '@/modules/automate/domain/events'
const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()
@@ -336,7 +337,7 @@ const createAppToken = createAppTokenFactory({
}),
getEncryptionKeyPairFor,
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken,
upsertAutomationRun,
getFullAutomationRevisionMetadata,
@@ -430,7 +431,7 @@ const createAppToken = createAppTokenFactory({
}),
getEncryptionKeyPairFor,
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken,
upsertAutomationRun,
getFullAutomationRevisionMetadata,
@@ -521,6 +522,18 @@ const createAppToken = createAppTokenFactory({
}
]
})
let eventFired = false
getEventBus().listenOnce(
AutomationRunEvents.Created,
async ({ payload }) => {
expect(payload.automation.id).to.equal(automation.id)
expect(payload.run.automationRevisionId).to.equal(automationRevisionId)
expect(payload.source).to.equal(RunTriggerSource.Manual)
eventFired = true
},
{ timeout: 1000 }
)
const executionEngineRunId = cryptoRandomString({ length: 10 })
const { automationRunId } = await triggerAutomationRevisionRunFactory({
automateRunTrigger: async () => ({
@@ -531,7 +544,7 @@ const createAppToken = createAppTokenFactory({
}),
getEncryptionKeyPairFor,
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken,
upsertAutomationRun,
getFullAutomationRevisionMetadata,
@@ -558,6 +571,7 @@ const createAppToken = createAppTokenFactory({
for (const run of storedRun.functionRuns) {
expect(run.status).to.equal(expectedStatus)
}
expect(eventFired).to.be.true
})
})
describe('Run conditions are NOT met if', () => {
@@ -1016,7 +1030,7 @@ const createAppToken = createAppTokenFactory({
}),
getEncryptionKeyPairFor,
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
emitEvent: getEventBus().emit,
getAutomationToken,
upsertAutomationRun,
getFullAutomationRevisionMetadata,
@@ -1242,7 +1256,7 @@ const createAppToken = createAppTokenFactory({
getAutomationFunctionRunRecord: getFunctionRun,
upsertAutomationFunctionRunRecord: upsertAutomationFunctionRun,
automationRunUpdater: updateAutomationRun,
runEventEmit: AutomateRunsEmitter.emit
emitEvent: getEventBus().emit
})
return report
@@ -1361,6 +1375,15 @@ const createAppToken = createAppTokenFactory({
projectId: testUserStream.id
}
let eventFired = false
getEventBus().listenOnce(
AutomationRunEvents.StatusUpdated,
async ({ payload }) => {
expect(payload.functionRun.id).to.equal(functionRunId)
eventFired = true
},
{ timeout: 1000 }
)
await expect(report(params)).to.eventually.be.true
const [updatedRun, updatedFnRun] = await Promise.all([
@@ -1371,6 +1394,7 @@ const createAppToken = createAppTokenFactory({
expect(updatedRun?.status).to.equal(AutomationRunStatuses.succeeded)
expect(updatedFnRun?.status).to.equal(AutomationRunStatuses.succeeded)
expect(updatedFnRun?.contextView).to.equal(contextView)
expect(eventFired).to.be.true
})
})
})
@@ -39,6 +39,12 @@ import {
commentEventsNamespace,
CommentEventsPayloads
} from '@/modules/comments/domain/events'
import {
automationEventsNamespace,
AutomationEventsPayloads,
automationRunEventsNamespace,
AutomationRunEventsPayloads
} from '@/modules/automate/domain/events'
type AllEventsWildcard = '**'
type EventWildcard = '*'
@@ -65,6 +71,8 @@ type EventsByNamespace = {
[versionEventsNamespace]: VersionEventsPayloads
[accessRequestEventsNamespace]: AccessRequestEventsPayloads
[commentEventsNamespace]: CommentEventsPayloads
[automationEventsNamespace]: AutomationEventsPayloads
[automationRunEventsNamespace]: AutomationRunEventsPayloads
}
type EventTypes = UnionToIntersection<EventsByNamespace[keyof EventsByNamespace]>
@@ -1,81 +0,0 @@
import { moduleLogger, Observability } from '@/logging/logging'
import { MaybeAsync } from '@/modules/shared/helpers/typeHelper'
import EventEmitter from 'eventemitter2'
export type ModuleEventEmitterParams = {
moduleName: string
/**
* If you have multiple emitters in a single module, you can use this identify
* each of them differently
*/
namespace?: string
}
/**
* Initialize Speckle Module scoped event emitter. These can be used to make code more SOLID - instead of
* modifying some code that does X every time you want to do something extra when X occurs, just emit an event
* there and specify the listening code in a more appropriate module.
*
* Example: Instead of comment mentions being sent out from the comment repository's "createComment" function,
* this repo function emits a COMMENT_CREATED event, that is then handled in a more appropriate module - the speckle
* Notifications module.
*/
export function initializeModuleEventEmitter<P extends Record<string, unknown>>(
params: ModuleEventEmitterParams
) {
const { moduleName, namespace } = params
const identifier = namespace ? `${moduleName}-${namespace}` : moduleName
const logger = Observability.extendLoggerComponent(moduleLogger, identifier, 'events')
const errHandler = (e: unknown) => {
logger.error(e, `Unhandled ${identifier} event emitter error`)
}
const emitter = new EventEmitter()
emitter.on('uncaughtException', errHandler)
emitter.on('error', errHandler)
return {
/**
* Emit a module event. This function must be awaited to ensure all listeners
* execute. Any errors thrown in the listeners will bubble up and throw from
* the part of code that triggers this emit() call.
*/
emit: async <K extends keyof P & string>(eventName: K, payload: P[K]) => {
return (await emitter.emitAsync(eventName, payload)) as unknown[]
},
/**
* Listen for module events. Any errors thrown here will bubble out of where
* emit() was invoked.
*
* @returns Callback for stopping listening
*/
listen: <K extends keyof P & string>(
eventName: K,
handler: (payload: P[K]) => MaybeAsync<void>
) => {
emitter.on(eventName, handler, {
async: true,
promisify: true
})
return () => {
emitter.removeListener(eventName, handler)
}
},
/**
* Destroy event emitter
*/
destroy() {
emitter.removeAllListeners()
},
/**
* Debugger scoped to this module event emitter
*/
logger
}
}
@@ -39,9 +39,9 @@ import {
} from '@/modules/automate/services/encryption'
import { buildDecryptor } from '@/modules/shared/utils/libsodium'
import { db } from '@/db/knex'
import { AutomationsEmitter } from '@/modules/automate/events/automations'
import { validateStreamAccessFactory } from '@/modules/core/services/streams/access'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
const storeAutomation = storeAutomationFactory({ db })
const storeAutomationToken = storeAutomationTokenFactory({ db })
@@ -69,7 +69,7 @@ export const buildAutomationCreate = (
storeAutomation,
storeAutomationToken,
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
eventEmit: getEventBus().emit
})
return create
@@ -101,7 +101,7 @@ export const buildAutomationRevisionCreate = (
buildDecryptor
}),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit,
eventEmit: getEventBus().emit,
...overrides
})