fix(automate): automate module multi region (#3531)

* refactor(automate): logs api can get the projectId from the path

* fix(automate): multiregion gql resolvers

* fix(automate): multiregion event listeners

* fix(automate): drop automationCount

* fix(automate): multiregion run status

* fix(automate): correctness

* fix(automate): actually finish event listeners

* chore(automate): fix tests fix tests

* fix(automate): fix tests but make it multiregion flavor

* fix(automate): logs endpoint

* chore(automate): globalDb to db

* fix(automate): inject projectid correctly

* fix(automate): debug log fetch failure

* chore(automate): fix tests for new message

---------

Co-authored-by: Gergő Jedlicska <gergo@jedlicska.com>
This commit is contained in:
Chuck Driesler
2024-11-27 15:26:09 +00:00
committed by GitHub
parent a90bc3a1ed
commit fd5f316af0
31 changed files with 526 additions and 599 deletions
@@ -38,10 +38,6 @@
<span class="font-medium">Last published:&nbsp;</span>
<CommonText :text="publishedAt" />
</div>
<div>
<span class="font-medium">Used by:&nbsp;</span>
<CommonText :text="`${fn.automationCount} automations`" />
</div>
<CommonTextLink v-if="latestRelease?.inputSchema" @click="onViewParameters">
View parameters
</CommonTextLink>
@@ -110,7 +106,6 @@ graphql(`
owner
name
}
automationCount
description
releases(limit: 1) {
items {
@@ -84,6 +84,7 @@ const {
isDataLoaded: areLogsFullyRead,
loading: logsLoading
} = useAutomationRunLogs({
projectId: computed(() => props.projectId),
automationId: computed(() => props.automationId),
runId: computed(() => props.run?.id)
})
@@ -165,10 +165,11 @@ export const useAutomationRunDetailsFns = () => {
}
export const useAutomationRunLogs = (params: {
projectId: MaybeRef<Optional<string>>
automationId: MaybeRef<Optional<string>>
runId: MaybeRef<Optional<string>>
}) => {
const { automationId, runId } = params
const { projectId, automationId, runId } = params
const apiOrigin = useApiOrigin()
const authToken = useAuthCookie()
@@ -178,7 +179,10 @@ export const useAutomationRunLogs = (params: {
const isStreamFinished = ref(false)
const url = computed(
() => `/api/automate/automations/${unref(automationId)}/runs/${unref(runId)}/logs`
() =>
`/api/v1/projects/${unref(projectId)}/automations/${unref(
automationId
)}/runs/${unref(runId)}/logs`
)
const key = computed(() => {
if (!unref(automationId) || !unref(runId)) return null
@@ -30,7 +30,7 @@ const documents = {
"\n fragment AutomateFunctionCreateDialogDoneStep_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n ...AutomationsFunctionsCard_AutomateFunction\n }\n": types.AutomateFunctionCreateDialogDoneStep_AutomateFunctionFragmentDoc,
"\n fragment AutomateFunctionCreateDialogTemplateStep_AutomateFunctionTemplate on AutomateFunctionTemplate {\n id\n title\n logo\n url\n }\n": types.AutomateFunctionCreateDialogTemplateStep_AutomateFunctionTemplateFragmentDoc,
"\n fragment AutomateFunctionPageHeader_Function on AutomateFunction {\n id\n name\n logo\n repo {\n id\n url\n owner\n name\n }\n releases(limit: 1) {\n totalCount\n }\n }\n": types.AutomateFunctionPageHeader_FunctionFragmentDoc,
"\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n automationCount\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n": types.AutomateFunctionPageInfo_AutomateFunctionFragmentDoc,
"\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n": types.AutomateFunctionPageInfo_AutomateFunctionFragmentDoc,
"\n fragment AutomateFunctionPageParametersDialog_AutomateFunctionRelease on AutomateFunctionRelease {\n id\n inputSchema\n }\n": types.AutomateFunctionPageParametersDialog_AutomateFunctionReleaseFragmentDoc,
"\n fragment AutomateFunctionsPageHeader_Query on Query {\n activeUser {\n id\n automateInfo {\n hasAutomateGithubApp\n availableGithubOrgs\n }\n }\n serverInfo {\n automate {\n availableFunctionTemplates {\n ...AutomateFunctionCreateDialogTemplateStep_AutomateFunctionTemplate\n }\n }\n }\n }\n": types.AutomateFunctionsPageHeader_QueryFragmentDoc,
"\n fragment AutomateFunctionsPageItems_Query on Query {\n automateFunctions(limit: 20, filter: { search: $search }, cursor: $cursor) {\n totalCount\n items {\n id\n ...AutomationsFunctionsCard_AutomateFunction\n ...AutomateAutomationCreateDialog_AutomateFunction\n }\n cursor\n }\n }\n": types.AutomateFunctionsPageItems_QueryFragmentDoc,
@@ -444,7 +444,7 @@ export function graphql(source: "\n fragment AutomateFunctionPageHeader_Functio
/**
* The graphql function is used to parse GraphQL queries into a document that can be used by GraphQL clients.
*/
export function graphql(source: "\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n automationCount\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n"): (typeof documents)["\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n automationCount\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n"];
export function graphql(source: "\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n"): (typeof documents)["\n fragment AutomateFunctionPageInfo_AutomateFunction on AutomateFunction {\n id\n repo {\n id\n url\n owner\n name\n }\n description\n releases(limit: 1) {\n items {\n id\n inputSchema\n createdAt\n commitId\n ...AutomateFunctionPageParametersDialog_AutomateFunctionRelease\n }\n }\n }\n"];
/**
* The graphql function is used to parse GraphQL queries into a document that can be used by GraphQL clients.
*/
File diff suppressed because one or more lines are too long
@@ -138,7 +138,6 @@ type AutomateFunction {
limit: Int
filter: AutomateFunctionReleasesFilter
): AutomateFunctionReleaseCollection!
automationCount: Int!
"""
SourceAppNames values from @speckle/shared. Empty array means - all of them
"""
@@ -217,6 +216,7 @@ input AutomateFunctionsFilter {
}
input AutomateFunctionRunStatusReportInput {
projectId: String!
functionRunId: String!
status: AutomateRunStatus!
statusMessage: String
@@ -0,0 +1,16 @@
import {
AutomationRunStatus,
AutomationRunStatuses
} from '@/modules/automate/helpers/types'
export const isFinished = (runStatus: AutomationRunStatus) => {
const finishedStatuses: AutomationRunStatus[] = [
AutomationRunStatuses.succeeded,
AutomationRunStatuses.failed,
AutomationRunStatuses.exception,
AutomationRunStatuses.timeout,
AutomationRunStatuses.canceled
]
return finishedStatuses.includes(runStatus)
}
@@ -177,10 +177,6 @@ export type GetRevisionsFunctions = (params: {
automationRevisionIds: string[]
}) => Promise<{ [automationRevisionId: string]: AutomateRevisionFunctionRecord[] }>
export type GetFunctionAutomationCounts = (params: {
functionIds: string[]
}) => Promise<{ [functionId: string]: number }>
export type CreateStoredAuthCode = (
params: Omit<AuthCodePayload, 'code'>
) => Promise<AuthCodePayload>
@@ -26,6 +26,7 @@ export type AutomateEventsPayloads = {
run: AutomationRunRecord
functionRun: AutomationFunctionRunRecord
automationId: string
projectId: string
}
}
@@ -159,7 +159,8 @@ const mocks: SpeckleModuleMocksConfig = FF_AUTOMATE_MODULE_ENABLED
(i): AutomationRevisionTriggerDefinitionGraphQLReturn => ({
triggerType: VersionCreationTriggerType,
triggeringId: i.model.id,
automationRevisionId: parent.id
automationRevisionId: parent.id,
projectId: (store.get('Project') as any).id
})
)
},
@@ -233,8 +234,7 @@ const mocks: SpeckleModuleMocksConfig = FF_AUTOMATE_MODULE_ENABLED
// return rand ? (store.get('LimitedUser') as any) : activeUser
// }
releases: () => store.get('AutomateFunctionReleaseCollection') as any,
automationCount: () => faker.number.int({ min: 0, max: 99 })
releases: () => store.get('AutomateFunctionReleaseCollection') as any
},
AutomateFunctionRelease: {
function: () => store.get('AutomateFunction') as any
@@ -116,33 +116,10 @@ import {
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()
const getUser = getUserFactory({ db })
const storeAutomation = storeAutomationFactory({ db })
const storeAutomationToken = storeAutomationTokenFactory({ db })
const storeAutomationRevision = storeAutomationRevisionFactory({ db })
const getAutomation = getAutomationFactory({ db })
const updateDbAutomation = updateAutomationFactory({ db })
const getLatestVersionAutomationRuns = getLatestVersionAutomationRunsFactory({ db })
const getFunctionRun = getFunctionRunFactory({ db })
const upsertAutomationFunctionRun = upsertAutomationFunctionRunFactory({ db })
const getFullAutomationRevisionMetadata = getFullAutomationRevisionMetadataFactory({
db
})
const getAutomationToken = getAutomationTokenFactory({ db })
const upsertAutomationRun = upsertAutomationRunFactory({ db })
const getAutomationTriggerDefinitions = getAutomationTriggerDefinitionsFactory({ db })
const getLatestAutomationRevision = getLatestAutomationRevisionFactory({ db })
const updateAutomationRun = updateAutomationRunFactory({ db })
const getAutomationRunsTotalCount = getAutomationRunsTotalCountFactory({ db })
const getAutomationRunsItems = getAutomationRunsItemsFactory({ db })
const getProjectAutomationsItems = getProjectAutomationsItemsFactory({ db })
const getProjectAutomationsTotalCount = getProjectAutomationsTotalCountFactory({ db })
const getBranchLatestCommits = getBranchLatestCommitsFactory({ db })
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
const createAppToken = createAppTokenFactory({
storeApiToken: storeApiTokenFactory({ db }),
@@ -183,34 +160,56 @@ export = (FF_AUTOMATE_MODULE_ENABLED
VersionCreatedTriggerDefinition: {
type: () => AutomateRunTriggerType.VersionCreated,
async model(parent, _args, ctx) {
return ctx.loaders.branches.getById.load(parent.triggeringId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.branches.getById.load(parent.triggeringId)
}
},
VersionCreatedTrigger: {
type: () => AutomateRunTriggerType.VersionCreated,
async version(parent, _args, ctx) {
return ctx.loaders.commits.getById.load(parent.triggeringId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.commits.getById.load(parent.triggeringId)
},
async model(parent, _args, ctx) {
return ctx.loaders.commits.getCommitBranch.load(parent.triggeringId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.commits.getCommitBranch.load(parent.triggeringId)
}
},
ProjectTriggeredAutomationsStatusUpdatedMessage: {
async project(parent, _args, ctx) {
return ctx.loaders.streams.getStream.load(parent.projectId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.streams.getStream.load(parent.projectId)
},
async model(parent, _args, ctx) {
return ctx.loaders.branches.getById.load(parent.modelId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.branches.getById.load(parent.modelId)
},
async version(parent, _args, ctx) {
return ctx.loaders.commits.getById.load(parent.versionId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.commits.getById.load(parent.versionId)
}
},
Project: {
async automation(parent, args, ctx) {
const res = ctx.loaders.streams.getAutomation
.forStream(parent.id)
const projectDb = await getProjectDbClient({ projectId: parent.id })
const res = ctx.loaders
.forRegion({ db: projectDb })
.streams.getAutomation.forStream(parent.id)
.load(args.id)
if (!res) {
if (!res) {
throw new AutomationNotFoundError()
@@ -220,14 +219,16 @@ export = (FF_AUTOMATE_MODULE_ENABLED
return res
},
async automations(parent, args) {
const projectDb = await getProjectDbClient({ projectId: parent.id })
const retrievalArgs: GetProjectAutomationsParams = {
projectId: parent.id,
args
}
const [{ items, cursor }, totalCount] = await Promise.all([
getProjectAutomationsItems(retrievalArgs),
getProjectAutomationsTotalCount(retrievalArgs)
getProjectAutomationsItemsFactory({ db: projectDb })(retrievalArgs),
getProjectAutomationsTotalCountFactory({ db: projectDb })(retrievalArgs)
])
return {
@@ -239,15 +240,21 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
Model: {
async automationsStatus(parent, _args, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.streamId })
const getLatestVersionAutomationRuns = getLatestVersionAutomationRunsFactory({
db: projectDb
})
const getStatus = getAutomationsStatusFactory({
getLatestVersionAutomationRuns
})
const modelId = parent.id
const projectId = parent.streamId
const latestCommit = await ctx.loaders.branches.getLatestCommit.load(
parent.id
)
const latestCommit = await ctx.loaders
.forRegion({ db: projectDb })
.branches.getLatestCommit.load(parent.id)
// if the model has no versions, no automations could have run
if (!latestCommit) return null
@@ -261,12 +268,18 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
Version: {
async automationsStatus(parent, _args, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.streamId })
const getStatus = getAutomationsStatusFactory({
getLatestVersionAutomationRuns
getLatestVersionAutomationRuns: getLatestVersionAutomationRunsFactory({
db: projectDb
})
})
const versionId = parent.id
const branch = await ctx.loaders.commits.getCommitBranch.load(versionId)
const branch = await ctx.loaders
.forRegion({ db: projectDb })
.commits.getCommitBranch.load(versionId)
if (!branch) throw Error('Invalid version Id')
const projectId = branch.streamId
@@ -280,19 +293,25 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
Automation: {
async currentRevision(parent, _args, ctx) {
return ctx.loaders.automations.getLatestAutomationRevision.load(parent.id)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const automationRevision = await ctx.loaders
.forRegion({ db: projectDb })
.automations.getLatestAutomationRevision.load(parent.id)
return { ...automationRevision, projectId: parent.projectId }
},
async runs(parent, args) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const retrievalArgs = {
automationId: parent.id,
...args
}
const [{ items, cursor }, totalCount] = await Promise.all([
getAutomationRunsItems({
getAutomationRunsItemsFactory({ db: projectDb })({
args: retrievalArgs
}),
getAutomationRunsTotalCount({
getAutomationRunsTotalCountFactory({ db: projectDb })({
args: retrievalArgs
})
])
@@ -317,18 +336,26 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
AutomateRun: {
async trigger(parent, _args, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const triggers =
parent.triggers ||
(await ctx.loaders.automations.getRunTriggers.load(parent.id))
(await ctx.loaders
.forRegion({ db: projectDb })
.automations.getRunTriggers.load(parent.id))
const trigger = triggers[0]
return trigger
return { ...trigger, projectId: parent.projectId }
},
async functionRuns(parent) {
return parent.functionRuns
},
async automation(parent, _args, ctx) {
return ctx.loaders.automations.getAutomation.load(parent.automationId)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return ctx.loaders
.forRegion({ db: projectDb })
.automations.getAutomation.load(parent.automationId)
},
status: (parent) => mapDbStatusToGqlStatus(parent.status)
},
@@ -363,24 +390,36 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
AutomationRevision: {
async triggerDefinitions(parent, _args, ctx) {
const triggers =
await ctx.loaders.automations.getRevisionTriggerDefinitions.load(parent.id)
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
return triggers
const triggers = await ctx.loaders
.forRegion({ db: projectDb })
.automations.getRevisionTriggerDefinitions.load(parent.id)
return triggers.map((trigger) => ({
...trigger,
projectId: parent.projectId
}))
},
async functions(parent, _args, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const prepareInputs = getFunctionInputsForFrontendFactory({
getEncryptionKeyPairFor,
buildDecryptor,
redactWriteOnlyInputData
})
const fns = await ctx.loaders.automations.getRevisionFunctions.load(parent.id)
const fns = await ctx.loaders
.forRegion({ db: projectDb })
.automations.getRevisionFunctions.load(parent.id)
const fnsReleases = keyBy(
(
await ctx.loaders.automationsApi.getFunctionRelease.loadMany(
fns.map((fn) => [fn.functionId, fn.functionReleaseId])
)
await ctx.loaders
.forRegion({ db: projectDb })
.automationsApi.getFunctionRelease.loadMany(
fns.map((fn) => [fn.functionId, fn.functionReleaseId])
)
).filter(
(r): r is FunctionReleaseSchemaType => r !== null && !(r instanceof Error)
),
@@ -412,9 +451,6 @@ export = (FF_AUTOMATE_MODULE_ENABLED
}
},
AutomateFunction: {
async automationCount(parent, _args, ctx) {
return ctx.loaders.automations.getFunctionAutomationCount.load(parent.id)
},
async releases(parent, args) {
try {
// TODO: Replace w/ dataloader batch call, when/if possible
@@ -481,7 +517,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
async createFunction(_parent, args, ctx) {
const create = createFunctionFromTemplateFactory({
createExecutionEngineFn: createFunction,
getUser,
getUser: getUserFactory({ db }),
createStoredAuthCode: createStoredAuthCodeFactory({
redis: getGenericRedis()
})
@@ -503,11 +539,13 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
ProjectAutomationMutations: {
async create(parent, { input }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const create = createAutomationFactory({
createAuthCode: createStoredAuthCodeFactory({ redis: getGenericRedis() }),
automateCreateAutomation: clientCreateAutomation,
storeAutomation,
storeAutomationToken,
storeAutomation: storeAutomationFactory({ db: projectDb }),
storeAutomationToken: storeAutomationTokenFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
})
@@ -522,9 +560,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED
).automation
},
async update(parent, { input }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const update = validateAndUpdateAutomationFactory({
getAutomation,
updateAutomation: updateDbAutomation,
getAutomation: getAutomationFactory({ db: projectDb }),
updateAutomation: updateAutomationFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
})
@@ -537,10 +577,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED
})
},
async createRevision(parent, { input }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const create = createAutomationRevisionFactory({
getAutomation,
storeAutomationRevision,
getBranchesByIds: getBranchesByIdsFactory({ db }),
getAutomation: getAutomationFactory({ db: projectDb }),
storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }),
getBranchesByIds: getBranchesByIdsFactory({ db: projectDb }),
getFunctionRelease,
getEncryptionKeyPair,
getFunctionInputDecryptor: getFunctionInputDecryptorFactory({
@@ -559,10 +601,14 @@ export = (FF_AUTOMATE_MODULE_ENABLED
})
},
async trigger(parent, { automationId }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const trigger = manuallyTriggerAutomationFactory({
getAutomationTriggerDefinitions,
getAutomation,
getBranchLatestCommits,
getAutomationTriggerDefinitions: getAutomationTriggerDefinitionsFactory({
db: projectDb
}),
getAutomation: getAutomationFactory({ db: projectDb }),
getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDb }),
triggerFunction: triggerAutomationRevisionRunFactory({
automateRunTrigger: triggerAutomationRun,
getEncryptionKeyPairFor,
@@ -571,11 +617,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED
}),
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
getAutomationToken,
upsertAutomationRun,
getFullAutomationRevisionMetadata,
getBranchLatestCommits,
getCommit: getCommitFactory({ db })
getAutomationToken: getAutomationTokenFactory({ db: projectDb }),
upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }),
getFullAutomationRevisionMetadata:
getFullAutomationRevisionMetadataFactory({ db: projectDb }),
getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDb }),
getCommit: getCommitFactory({ db: projectDb })
}),
validateStreamAccess
})
@@ -590,11 +637,13 @@ export = (FF_AUTOMATE_MODULE_ENABLED
return automationRunId
},
async createTestAutomation(parent, { input }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const create = createTestAutomationFactory({
getEncryptionKeyPair,
getFunction,
storeAutomation,
storeAutomationRevision,
storeAutomation: storeAutomationFactory({ db: projectDb }),
storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }),
validateStreamAccess,
automationsEventsEmit: AutomationsEmitter.emit
})
@@ -607,16 +656,30 @@ export = (FF_AUTOMATE_MODULE_ENABLED
})
},
async createTestAutomationRun(parent, { automationId }, ctx) {
const projectDb = await getProjectDbClient({ projectId: parent.projectId })
const create = createTestAutomationRunFactory({
getEncryptionKeyPairFor,
getFunctionInputDecryptor: getFunctionInputDecryptorFactory({
buildDecryptor
}),
getAutomation,
getLatestAutomationRevision,
getFullAutomationRevisionMetadata,
upsertAutomationRun,
getBranchLatestCommits,
getAutomation: getAutomationFactory({
db: projectDb
}),
getLatestAutomationRevision: getLatestAutomationRevisionFactory({
db: projectDb
}),
getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory(
{
db: projectDb
}
),
upsertAutomationRun: upsertAutomationRunFactory({
db: projectDb
}),
getBranchLatestCommits: getBranchLatestCommitsFactory({
db: projectDb
}),
validateStreamAccess
})
@@ -752,10 +815,17 @@ export = (FF_AUTOMATE_MODULE_ENABLED
},
Mutation: {
async automateFunctionRunStatusReport(_parent, { input }) {
const projectDb = await getProjectDbClient({ projectId: input.projectId })
const deps: ReportFunctionRunStatusDeps = {
getAutomationFunctionRunRecord: getFunctionRun,
upsertAutomationFunctionRunRecord: upsertAutomationFunctionRun,
automationRunUpdater: updateAutomationRun,
getAutomationFunctionRunRecord: getFunctionRunFactory({
db: projectDb
}),
upsertAutomationFunctionRunRecord: upsertAutomationFunctionRunFactory({
db: projectDb
}),
automationRunUpdater: updateAutomationRunFactory({
db: projectDb
}),
runEventEmit: AutomateRunsEmitter.emit
}
@@ -42,13 +42,17 @@ export type AutomateFunctionReleaseGraphQLReturn = Pick<
export type AutomationGraphQLReturn = AutomationRecord
export type AutomationRevisionGraphQLReturn = AutomationRevisionRecord
export type AutomationRevisionGraphQLReturn = AutomationRevisionRecord & {
projectId: string
}
export type ProjectAutomationMutationsGraphQLReturn = { projectId: string }
export type AutomationRevisionTriggerDefinitionGraphQLReturn =
AutomationTriggerDefinitionRecord
export type AutomationRunTriggerGraphQLReturn = AutomationRunTriggerRecord
AutomationTriggerDefinitionRecord & { projectId: string }
export type AutomationRunTriggerGraphQLReturn = AutomationRunTriggerRecord & {
projectId: string
}
export type AutomationRevisionFunctionGraphQLReturn = Merge<
AutomateRevisionFunctionRecord,
@@ -61,9 +65,12 @@ export type AutomationRevisionFunctionGraphQLReturn = Merge<
export type AutomateRunGraphQLReturn = SetOptional<
AutomationRunWithTriggersFunctionRuns,
'triggers'
>
> & { projectId: string }
export type AutomateFunctionRunGraphQLReturn = AutomationFunctionRunRecord
export type AutomateFunctionRunGraphQLReturn = AutomationFunctionRunRecord & {
automationId: string
projectId: string
}
export type TriggeredAutomationsStatusGraphQLReturn = Merge<
TriggeredAutomationsStatus,
+257 -57
View File
@@ -1,4 +1,4 @@
import { moduleLogger } from '@/logging/logging'
import { automateLogger, moduleLogger } from '@/logging/logging'
import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { VersionEvents, VersionsEmitter } from '@/modules/core/events/versionsEmitter'
import {
@@ -15,7 +15,7 @@ import {
getFullAutomationRunByIdFactory,
upsertAutomationRunFactory
} from '@/modules/automate/repositories/automations'
import { Scopes } from '@speckle/shared'
import { isNonNullable, Scopes, throwUncoveredError } from '@speckle/shared'
import { registerOrUpdateScopeFactory } from '@/modules/shared/repositories/scopes'
import { triggerAutomationRun } from '@/modules/automate/clients/executionEngine'
import logStreamRest from '@/modules/automate/rest/logStream'
@@ -24,18 +24,17 @@ import {
getFunctionInputDecryptorFactory
} from '@/modules/automate/services/encryption'
import { buildDecryptor } from '@/modules/shared/utils/libsodium'
import {
setupAutomationUpdateSubscriptionsFactory,
setupStatusUpdateSubscriptionsFactory
} from '@/modules/automate/services/subscriptions'
import { setupRunFinishedTrackingFactory } from '@/modules/automate/services/tracking'
import { getUserEmailFromAutomationRunFactory } from '@/modules/automate/services/tracking'
import authGithubAppRest from '@/modules/automate/rest/authGithubApp'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'
import { TokenScopeData } from '@/modules/shared/domain/rolesAndScopes/types'
import db from '@/db/knex'
import { AutomationsEmitter } from '@/modules/automate/events/automations'
import { publish } from '@/modules/shared/utils/subscriptions'
import { AutomateRunsEmitter } from '@/modules/automate/events/runs'
import { db } from '@/db/knex'
import {
AutomationsEmitter,
AutomationsEvents
} from '@/modules/automate/events/automations'
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import { AutomateRunsEmitter, AutomateRunsEvents } from '@/modules/automate/events/runs'
import { getBranchLatestCommitsFactory } from '@/modules/core/repositories/branches'
import { getCommitFactory } from '@/modules/core/repositories/commits'
import { legacyGetUserFactory } from '@/modules/core/repositories/users'
@@ -46,6 +45,18 @@ import {
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import {
ProjectAutomationsUpdatedMessageType,
ProjectTriggeredAutomationsStatusUpdatedMessageType
} from '@/modules/core/graph/generated/graphql'
import {
isVersionCreatedTriggerManifest,
RunTriggerSource,
VersionCreationTriggerType
} from '@/modules/automate/helpers/types'
import { isFinished } from '@/modules/automate/domain/logic'
import { mixpanel } from '@/modules/shared/utils/mixpanel'
const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()
let quitListeners: Optional<() => void> = undefined
@@ -76,11 +87,6 @@ async function initScopes() {
}
const initializeEventListeners = () => {
const getAutomationRunFullTriggers = getAutomationRunFullTriggersFactory({ db })
const getFullAutomationRevisionMetadata = getFullAutomationRevisionMetadataFactory({
db
})
const createAppToken = createAppTokenFactory({
storeApiToken: storeApiTokenFactory({ db }),
storeTokenScopes: storeTokenScopesFactory({ db }),
@@ -89,56 +95,250 @@ const initializeEventListeners = () => {
}),
storeUserServerAppToken: storeUserServerAppTokenFactory({ db })
})
const triggerFn = triggerAutomationRevisionRunFactory({
automateRunTrigger: triggerAutomationRun,
getEncryptionKeyPairFor,
getFunctionInputDecryptor: getFunctionInputDecryptorFactory({
buildDecryptor
}),
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
getAutomationToken: getAutomationTokenFactory({ db }),
upsertAutomationRun: upsertAutomationRunFactory({ db }),
getFullAutomationRevisionMetadata,
getBranchLatestCommits: getBranchLatestCommitsFactory({ db }),
getCommit: getCommitFactory({ db })
})
const setupStatusUpdateSubscriptionsInvoke = setupStatusUpdateSubscriptionsFactory({
getAutomationRunFullTriggers,
publish,
automateRunsEventsListener: AutomateRunsEmitter.listen
})
const setupAutomationUpdateSubscriptionsInvoke =
setupAutomationUpdateSubscriptionsFactory({
automationsEmitterListen: AutomationsEmitter.listen,
publish
})
const setupRunFinishedTrackingInvoke = setupRunFinishedTrackingFactory({
getFullAutomationRevisionMetadata,
getUser: legacyGetUserFactory({ db }),
getCommit: getCommitFactory({ db }),
getFullAutomationRunById: getFullAutomationRunByIdFactory({ db }),
automateRunsEventListener: AutomateRunsEmitter.listen
})
const getAutomation = getAutomationFactory({ db })
const getAutomationRevision = getAutomationRevisionFactory({ db })
const getActiveTriggerDefinitions = getActiveTriggerDefinitionsFactory({ db })
// TODO: Use new event bus
const quitters = [
// Automation trigger events
VersionsEmitter.listen(
VersionEvents.Created,
async ({ modelId, version, projectId }) => {
const projectDb = await getProjectDbClient({ projectId })
await onModelVersionCreateFactory({
getAutomation,
getAutomationRevision,
getTriggers: getActiveTriggerDefinitions,
triggerFunction: triggerFn
getAutomation: getAutomationFactory({ db: projectDb }),
getAutomationRevision: getAutomationRevisionFactory({ db: projectDb }),
getTriggers: getActiveTriggerDefinitionsFactory({ db: projectDb }),
triggerFunction: triggerAutomationRevisionRunFactory({
automateRunTrigger: triggerAutomationRun,
getEncryptionKeyPairFor,
getFunctionInputDecryptor: getFunctionInputDecryptorFactory({
buildDecryptor
}),
createAppToken,
automateRunsEmitter: AutomateRunsEmitter.emit,
getAutomationToken: getAutomationTokenFactory({ db: projectDb }),
upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }),
getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory(
{ db: projectDb }
),
getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDb }),
getCommit: getCommitFactory({ db: projectDb })
})
})({ modelId, versionId: version.id, projectId })
}
),
setupStatusUpdateSubscriptionsInvoke(),
setupAutomationUpdateSubscriptionsInvoke(),
setupRunFinishedTrackingInvoke()
// Automation management events
AutomationsEmitter.listen(AutomationsEvents.Created, async ({ automation }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Created,
automationId: automation.id,
automation,
revision: null
}
})
}),
AutomationsEmitter.listen(AutomationsEvents.Updated, async ({ automation }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Updated,
automationId: automation.id,
automation,
revision: null
}
})
}),
AutomationsEmitter.listen(
AutomationsEvents.CreatedRevision,
async ({ automation, revision }) => {
await publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.CreatedRevision,
automationId: automation.id,
automation,
revision: {
...revision,
projectId: automation.projectId
}
}
})
}
),
// Automation run lifecycle events
AutomateRunsEmitter.listen(
AutomateRunsEvents.Created,
async ({ manifests, run, automation }) => {
const validatedManifests = manifests
.map((manifest) => {
if (isVersionCreatedTriggerManifest(manifest)) {
return manifest
} else {
automateLogger.error('Unexpected run trigger manifest type', {
manifest
})
}
return null
})
.filter(isNonNullable)
await Promise.all(
validatedManifests.map(async (manifest) => {
await publish(
ProjectSubscriptions.ProjectTriggeredAutomationsStatusUpdated,
{
projectId: manifest.projectId,
projectTriggeredAutomationsStatusUpdated: {
...manifest,
run: {
...run,
automationId: automation.id,
functionRuns: run.functionRuns.map((functionRun) => ({
...functionRun,
runId: run.id
})),
triggers: run.triggers.map((trigger) => ({
...trigger,
automationRunId: run.id
})),
projectId: manifest.projectId
},
type: ProjectTriggeredAutomationsStatusUpdatedMessageType.RunCreated
}
}
)
})
)
}
),
AutomateRunsEmitter.listen(
AutomateRunsEvents.StatusUpdated,
async ({ run, functionRun, automationId, projectId }) => {
const projectDb = await getProjectDbClient({ projectId })
const triggers = await getAutomationRunFullTriggersFactory({ db: projectDb })({
automationRunId: run.id
})
if (triggers[VersionCreationTriggerType].length) {
const versionCreation = triggers[VersionCreationTriggerType]
await Promise.all(
versionCreation.map(async (trigger) => {
await publish(
ProjectSubscriptions.ProjectTriggeredAutomationsStatusUpdated,
{
projectId: trigger.model.streamId,
projectTriggeredAutomationsStatusUpdated: {
projectId: trigger.model.streamId,
modelId: trigger.model.id,
versionId: trigger.version.id,
run: {
...run,
functionRuns: [functionRun],
automationId,
triggers: undefined,
projectId: trigger.model.streamId
},
type: ProjectTriggeredAutomationsStatusUpdatedMessageType.RunUpdated
}
}
)
})
)
}
}
),
// Mixpanel events
AutomateRunsEmitter.listen(
AutomateRunsEvents.StatusUpdated,
async ({ run, functionRun, automationId, projectId }) => {
const projectDb = await getProjectDbClient({ projectId })
if (!isFinished(run.status)) return
const automationWithRevision = await getFullAutomationRevisionMetadataFactory({
db: projectDb
})(run.automationRevisionId)
const fullRun = await getFullAutomationRunByIdFactory({ db: projectDb })(run.id)
if (!fullRun) throw new Error('This should never happen')
if (!automationWithRevision) {
automateLogger.error(
{
run
},
'Run revision not found unexpectedly'
)
return
}
const userEmail = await getUserEmailFromAutomationRunFactory({
getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory({
db: projectDb
}),
getFullAutomationRunById: getFullAutomationRunByIdFactory({ db: projectDb }),
getCommit: getCommitFactory({ db: projectDb }),
getUser: legacyGetUserFactory({ db: projectDb })
})(fullRun, automationWithRevision.projectId)
const mp = mixpanel({ userEmail, req: undefined })
await mp.track('Automate Function Run Finished', {
automationId,
automationRevisionId: automationWithRevision.id,
automationName: automationWithRevision.name,
runId: run.id,
functionRunId: functionRun.id,
status: functionRun.status,
durationInSeconds: functionRun.elapsed / 1000,
durationInMilliseconds: functionRun.elapsed
})
}
),
AutomateRunsEmitter.listen(
AutomateRunsEvents.Created,
async ({ automation, run: automationRun, source, manifests }) => {
const manifest = manifests.at(0)
if (!manifest || !isVersionCreatedTriggerManifest(manifest)) {
automateLogger.error('Unexpected run trigger manifest type', {
manifest
})
return
}
const projectDb = await getProjectDbClient({ projectId: manifest.projectId })
// all triggers, that are automatic result of an action are in a need to be tracked
switch (source) {
case RunTriggerSource.Automatic: {
const userEmail = await getUserEmailFromAutomationRunFactory({
getFullAutomationRevisionMetadata:
getFullAutomationRevisionMetadataFactory({ db: projectDb }),
getFullAutomationRunById: getFullAutomationRunByIdFactory({
db: projectDb
}),
getCommit: getCommitFactory({ db: projectDb }),
getUser: legacyGetUserFactory({ db: projectDb })
})(automationRun, automation.projectId)
const mp = mixpanel({ userEmail, req: undefined })
await mp.track('Automation Run Triggered', {
automationId: automation.id,
automationName: automation.name,
automationRunId: automationRun.id,
projectId: automation.projectId,
source
})
break
}
// runs created from a user interaction are tracked in the frontend
case RunTriggerSource.Manual:
return
default:
throwUncoveredError(source)
}
}
)
]
return () => {
@@ -12,7 +12,6 @@ import {
GetAutomationTriggerDefinitions,
GetFullAutomationRevisionMetadata,
GetFullAutomationRunById,
GetFunctionAutomationCounts,
GetFunctionRun,
GetLatestAutomationRevision,
GetLatestAutomationRevisions,
@@ -78,7 +77,7 @@ import {
import { Nullable, StreamRoles, isNullOrUndefined } from '@speckle/shared'
import cryptoRandomString from 'crypto-random-string'
import { Knex } from 'knex'
import _, { clamp, groupBy, keyBy, pick, reduce } from 'lodash'
import _, { clamp, groupBy, keyBy, pick } from 'lodash'
import { SetOptional, SetRequired } from 'type-fest'
const tables = {
@@ -616,38 +615,6 @@ export const getRevisionsFunctionsFactory =
return groupBy(await q, (r) => r.automationRevisionId)
}
export const getFunctionAutomationCountsFactory =
(deps: { db: Knex }): GetFunctionAutomationCounts =>
async (params: { functionIds: string[] }) => {
const { functionIds } = params
if (!functionIds.length) return {}
const q = tables
.automationRevisionFunctions(deps.db)
.select<Array<{ functionId: string; count: string }>>([
AutomationRevisionFunctions.col.functionId,
knex.raw('count(distinct ??) as "count"', [
AutomationRevisions.col.automationId
])
])
.innerJoin(
AutomationRevisions.name,
AutomationRevisions.col.id,
AutomationRevisionFunctions.col.automationRevisionId
)
.whereIn(AutomationRevisionFunctions.col.functionId, functionIds)
.groupBy(AutomationRevisionFunctions.col.functionId)
return reduce(
await q,
(acc, r) => {
acc[r.functionId] = parseInt(r.count)
return acc
},
{} as Record<string, number>
)
}
type GetAutomationRunsArgs = AutomationRunsArgs & {
automationId: string
revisionId?: string
@@ -664,6 +631,11 @@ const getAutomationRunsTotalCountBaseQueryFactory =
AutomationRevisions.col.id,
AutomationRuns.col.automationRevisionId
)
.innerJoin(
Automations.name,
Automations.col.id,
AutomationRevisions.col.automationId
)
.where(AutomationRevisions.col.automationId, args.automationId)
if (args.revisionId?.length) {
@@ -696,6 +668,7 @@ export const getAutomationRunsItemsFactory =
// Attach trigger & function runs
q.select([
knex.raw(`(array_agg(??))[1] as "projectId"`, [Automations.col.projectId]),
AutomationRuns.groupArray('runs'),
AutomationRunTriggers.groupArray('triggers'),
AutomationFunctionRuns.groupArray('functionRuns'),
@@ -713,7 +686,6 @@ export const getAutomationRunsItemsFactory =
AutomationFunctionRuns.col.runId,
AutomationRuns.col.id
)
.groupBy(AutomationRuns.col.id)
.orderBy([
{ column: AutomationRuns.col.updatedAt, order: 'desc' },
@@ -730,14 +702,16 @@ export const getAutomationRunsItemsFactory =
triggers: AutomationRunTriggerRecord[]
functionRuns: AutomationFunctionRunRecord[]
automationId: string
projectId: string
}>
const items = res.map(
(r): AutomationRunWithTriggersFunctionRuns => ({
(r): AutomationRunWithTriggersFunctionRuns & { projectId: string } => ({
...formatJsonArrayRecords(r.runs)[0],
triggers: formatJsonArrayRecords(r.triggers),
functionRuns: formatJsonArrayRecords(r.functionRuns),
automationId: r.automationId
automationId: r.automationId,
projectId: r.projectId
})
)
@@ -1,12 +1,10 @@
import { db } from '@/db/knex'
import { getAutomationRunLogs } from '@/modules/automate/clients/executionEngine'
import { ExecutionEngineFailedResponseError } from '@/modules/automate/errors/executionEngine'
import {
getAutomationProjectFactory,
getAutomationRunWithTokenFactory
} from '@/modules/automate/repositories/automations'
import { getAutomationRunWithTokenFactory } from '@/modules/automate/repositories/automations'
import { corsMiddleware } from '@/modules/core/configs/cors'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import {
validateRequiredStreamFactory,
validateResourceAccess,
@@ -21,7 +19,7 @@ import { Application } from 'express'
export default (app: Application) => {
app.get(
'/api/automate/automations/:automationId/runs/:runId/logs',
'/api/v1/projects/:streamId/automations/:automationId/runs/:runId/logs',
corsMiddleware(),
authMiddlewareCreator([
validateServerRoleBuilderFactory({
@@ -29,8 +27,7 @@ export default (app: Application) => {
})({ requiredRole: Roles.Server.Guest }),
validateScope({ requiredScope: Scopes.Streams.Read }),
validateRequiredStreamFactory({
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db })
getStream: getStreamFactory({ db })
}),
validateStreamRoleBuilderFactory({ getRoles: getRolesFactory({ db }) })({
requiredRole: Roles.Stream.Owner
@@ -38,10 +35,13 @@ export default (app: Application) => {
validateResourceAccess
]),
async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const automationId = req.params.automationId
const runId = req.params.runId
const getAutomationRunWithToken = getAutomationRunWithTokenFactory({ db })
const getAutomationRunWithToken = getAutomationRunWithTokenFactory({
db: projectDb
})
const run = await getAutomationRunWithToken({
automationId,
automationRunId: runId
@@ -573,7 +573,8 @@ export const getAutomationsStatusFactory =
...r,
status: resolveStatusFromFunctionRunStatuses(
r.functionRuns.map((fr) => fr.status)
)
),
projectId: params.projectId
}))
const failedAutomations = runsWithUpdatedStatus.filter(
@@ -100,7 +100,7 @@ export const reportFunctionRunStatusFactory =
params: Pick<
AutomationFunctionRunRecord,
'runId' | 'status' | 'statusMessage' | 'contextView' | 'results'
>
> & { projectId: string }
): Promise<boolean> => {
const {
getAutomationFunctionRunRecord,
@@ -108,7 +108,7 @@ export const reportFunctionRunStatusFactory =
automationRunUpdater,
runEventEmit
} = deps
const { runId, ...statusReportData } = params
const { projectId, runId, ...statusReportData } = params
const currentFunctionRunRecordResult = await getAutomationFunctionRunRecord(runId)
@@ -150,7 +150,8 @@ export const reportFunctionRunStatusFactory =
await runEventEmit(AutomateRunsEmitter.events.StatusUpdated, {
run: updatedRun,
functionRun: nextFunctionRunRecord,
automationId
automationId,
projectId
})
return true
@@ -1,176 +0,0 @@
import { automateLogger } from '@/logging/logging'
import { GetAutomationRunFullTriggers } from '@/modules/automate/domain/operations'
import {
AutomationsEmitter,
AutomationsEventsListen
} from '@/modules/automate/events/automations'
import {
AutomateRunsEmitter,
AutomateRunsEventsListener
} from '@/modules/automate/events/runs'
import {
VersionCreationTriggerType,
isVersionCreatedTriggerManifest
} from '@/modules/automate/helpers/types'
import {
ProjectAutomationsUpdatedMessageType,
ProjectTriggeredAutomationsStatusUpdatedMessageType
} from '@/modules/core/graph/generated/graphql'
import {
ProjectSubscriptions,
PublishSubscription
} from '@/modules/shared/utils/subscriptions'
import { isNonNullable } from '@speckle/shared'
// TODO: Update AutomateRuns subscription
export const setupAutomationUpdateSubscriptionsFactory =
(deps: {
automationsEmitterListen: AutomationsEventsListen
publish: PublishSubscription
}) =>
() => {
const quitters = [
deps.automationsEmitterListen(
AutomationsEmitter.events.Created,
async ({ automation }) => {
await deps.publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Created,
automationId: automation.id,
automation,
revision: null
}
})
}
),
deps.automationsEmitterListen(
AutomationsEmitter.events.Updated,
async ({ automation }) => {
await deps.publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.Updated,
automationId: automation.id,
automation,
revision: null
}
})
}
),
deps.automationsEmitterListen(
AutomationsEmitter.events.CreatedRevision,
async ({ automation, revision }) => {
await deps.publish(ProjectSubscriptions.ProjectAutomationsUpdated, {
projectId: automation.projectId,
projectAutomationsUpdated: {
type: ProjectAutomationsUpdatedMessageType.CreatedRevision,
automationId: automation.id,
automation,
revision
}
})
}
)
]
return () => quitters.forEach((quitter) => quitter())
}
export type SetupStatusUpdateSubscriptionsDeps = {
getAutomationRunFullTriggers: GetAutomationRunFullTriggers
automateRunsEventsListener: AutomateRunsEventsListener
publish: PublishSubscription
}
export const setupStatusUpdateSubscriptionsFactory =
(deps: SetupStatusUpdateSubscriptionsDeps) => () => {
const { getAutomationRunFullTriggers, automateRunsEventsListener, publish } = deps
const quitters = [
automateRunsEventsListener(
AutomateRunsEmitter.events.Created,
async ({ manifests, run, automation }) => {
const validatedManifests = manifests
.map((manifest) => {
if (isVersionCreatedTriggerManifest(manifest)) {
return manifest
} else {
automateLogger.error('Unexpected run trigger manifest type', {
manifest
})
}
return null
})
.filter(isNonNullable)
await Promise.all(
validatedManifests.map(async (manifest) => {
await publish(
ProjectSubscriptions.ProjectTriggeredAutomationsStatusUpdated,
{
projectId: manifest.projectId,
projectTriggeredAutomationsStatusUpdated: {
...manifest,
run: {
...run,
automationId: automation.id,
functionRuns: run.functionRuns.map((functionRun) => ({
...functionRun,
runId: run.id
})),
triggers: run.triggers.map((trigger) => ({
...trigger,
automationRunId: run.id
}))
},
type: ProjectTriggeredAutomationsStatusUpdatedMessageType.RunCreated
}
}
)
})
)
}
),
automateRunsEventsListener(
AutomateRunsEmitter.events.StatusUpdated,
async ({ run, functionRun, automationId }) => {
const triggers = await getAutomationRunFullTriggers({
automationRunId: run.id
})
if (triggers[VersionCreationTriggerType].length) {
const versionCreation = triggers[VersionCreationTriggerType]
await Promise.all(
versionCreation.map(async (trigger) => {
await publish(
ProjectSubscriptions.ProjectTriggeredAutomationsStatusUpdated,
{
projectId: trigger.model.streamId,
projectTriggeredAutomationsStatusUpdated: {
projectId: trigger.model.streamId,
modelId: trigger.model.id,
versionId: trigger.version.id,
run: {
...run,
functionRuns: [functionRun],
automationId,
triggers: undefined
},
type: ProjectTriggeredAutomationsStatusUpdatedMessageType.RunUpdated
}
}
)
})
)
}
}
)
]
return () => quitters.forEach((quitter) => quitter())
}
@@ -1,38 +1,12 @@
import { automateLogger } from '@/logging/logging'
import {
GetFullAutomationRevisionMetadata,
GetFullAutomationRunById
} from '@/modules/automate/domain/operations'
import {
AutomateRunsEmitter,
AutomateRunsEventsListener
} from '@/modules/automate/events/runs'
import {
AutomationFunctionRunRecord,
AutomationRunRecord,
AutomationRunStatus,
AutomationRunStatuses,
AutomationWithRevision,
RunTriggerSource
} from '@/modules/automate/helpers/types'
import { InsertableAutomationRun } from '@/modules/automate/repositories/automations'
import { GetCommit } from '@/modules/core/domain/commits/operations'
import { LegacyGetUser } from '@/modules/core/domain/users/operations'
import { mixpanel } from '@/modules/shared/utils/mixpanel'
import { throwUncoveredError } from '@speckle/shared'
const isFinished = (runStatus: AutomationRunStatus) => {
const finishedStatuses: AutomationRunStatus[] = [
AutomationRunStatuses.succeeded,
AutomationRunStatuses.failed,
AutomationRunStatuses.exception,
AutomationRunStatuses.timeout,
AutomationRunStatuses.canceled
]
return finishedStatuses.includes(runStatus)
}
export type AutomateTrackingDeps = {
getFullAutomationRevisionMetadata: GetFullAutomationRevisionMetadata
getFullAutomationRunById: GetFullAutomationRunById
@@ -40,54 +14,7 @@ export type AutomateTrackingDeps = {
getUser: LegacyGetUser
}
const onAutomationRunStatusUpdatedFactory =
(deps: AutomateTrackingDeps) =>
async ({
run,
functionRun,
automationId
}: {
run: AutomationRunRecord
functionRun: AutomationFunctionRunRecord
automationId: string
}) => {
if (!isFinished(run.status)) return
const automationWithRevision = await deps.getFullAutomationRevisionMetadata(
run.automationRevisionId
)
const fullRun = await deps.getFullAutomationRunById(run.id)
if (!fullRun) throw new Error('This should never happen')
if (!automationWithRevision) {
automateLogger.error(
{
run
},
'Run revision not found unexpectedly'
)
return
}
const userEmail = await getUserEmailFromAutomationRunFactory(deps)(
fullRun,
automationWithRevision.projectId
)
const mp = mixpanel({ userEmail, req: undefined })
await mp.track('Automate Function Run Finished', {
automationId,
automationRevisionId: automationWithRevision.id,
automationName: automationWithRevision.name,
runId: run.id,
functionRunId: functionRun.id,
status: functionRun.status,
durationInSeconds: functionRun.elapsed / 1000,
durationInMilliseconds: functionRun.elapsed
})
}
const getUserEmailFromAutomationRunFactory =
export const getUserEmailFromAutomationRunFactory =
(deps: AutomateTrackingDeps) =>
async (
automationRun: Pick<InsertableAutomationRun, 'triggers'>,
@@ -114,60 +41,3 @@ const getUserEmailFromAutomationRunFactory =
}
return userEmail
}
const onRunCreatedFactory =
(deps: AutomateTrackingDeps) =>
async ({
automation,
run: automationRun,
source
}: {
automation: AutomationWithRevision
run: InsertableAutomationRun
source: RunTriggerSource
}) => {
// all triggers, that are automatic result of an action are in a need to be tracked
switch (source) {
case RunTriggerSource.Automatic: {
const userEmail = await getUserEmailFromAutomationRunFactory(deps)(
automationRun,
automation.projectId
)
const mp = mixpanel({ userEmail, req: undefined })
await mp.track('Automation Run Triggered', {
automationId: automation.id,
automationName: automation.name,
automationRunId: automationRun.id,
projectId: automation.projectId,
source
})
break
}
// runs created from a user interaction are tracked in the frontend
case RunTriggerSource.Manual:
return
default:
throwUncoveredError(source)
}
}
export const setupRunFinishedTrackingFactory =
(
deps: AutomateTrackingDeps & {
automateRunsEventListener: AutomateRunsEventsListener
}
) =>
() => {
const quitters = [
deps.automateRunsEventListener(
AutomateRunsEmitter.events.StatusUpdated,
onAutomationRunStatusUpdatedFactory(deps)
),
deps.automateRunsEventListener(
AutomateRunsEmitter.events.Created,
onRunCreatedFactory(deps)
)
]
return () => quitters.forEach((quitter) => quitter())
}
@@ -441,7 +441,8 @@ const createAppToken = createAppTokenFactory({
manifest: <VersionCreatedTriggerManifest>{
versionId: version.id,
modelId: trigger.triggeringId,
triggerType: trigger.triggerType
triggerType: trigger.triggerType,
projectId: project.id
},
source: RunTriggerSource.Manual
})
@@ -541,7 +542,8 @@ const createAppToken = createAppTokenFactory({
manifest: <VersionCreatedTriggerManifest>{
versionId: version.id,
modelId: trigger.triggeringId,
triggerType: trigger.triggerType
triggerType: trigger.triggerType,
projectId: project.id
},
source: RunTriggerSource.Manual
})
@@ -1255,7 +1257,8 @@ const createAppToken = createAppTokenFactory({
status: mapGqlStatusToDbStatus(AutomateRunStatus.Succeeded),
statusMessage: null,
results: null,
contextView: null
contextView: null,
projectId: testUserStream.id
}
await expect(report(params)).to.eventually.be.rejectedWith(
@@ -1272,7 +1275,8 @@ const createAppToken = createAppTokenFactory({
status: mapGqlStatusToDbStatus(AutomateRunStatus.Pending),
statusMessage: null,
results: null,
contextView: null
contextView: null,
projectId: testUserStream.id
}
await expect(report(params)).to.eventually.be.rejectedWith(
@@ -1314,7 +1318,8 @@ const createAppToken = createAppTokenFactory({
status: mapGqlStatusToDbStatus(AutomateRunStatus.Succeeded),
statusMessage: null,
results: val as unknown as Automate.AutomateTypes.ResultsSchema,
contextView: null
contextView: null,
projectId: testUserStream.id
}
await expect(report(params)).to.eventually.be.rejectedWith(
@@ -1332,7 +1337,8 @@ const createAppToken = createAppTokenFactory({
status: mapGqlStatusToDbStatus(AutomateRunStatus.Succeeded),
statusMessage: null,
results: null,
contextView: 'invalid-url'
contextView: 'invalid-url',
projectId: testUserStream.id
}
await expect(report(params)).to.eventually.be.rejectedWith(
@@ -1351,7 +1357,8 @@ const createAppToken = createAppTokenFactory({
status: mapGqlStatusToDbStatus(AutomateRunStatus.Succeeded),
statusMessage: null,
results: null,
contextView
contextView,
projectId: testUserStream.id
}
await expect(report(params)).to.eventually.be.true
+9 -26
View File
@@ -42,13 +42,11 @@ import {
fullyDeleteBlobFactory
} from '@/modules/blobstorage/services/management'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { getAutomationProjectFactory } from '@/modules/automate/repositories/automations'
import { adminOverrideEnabled } from '@/modules/shared/helpers/envHelper'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { Request, Response } from 'express'
import { ensureError } from '@speckle/shared'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { Knex } from 'knex'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const ensureConditions = async () => {
@@ -89,26 +87,23 @@ const errorHandler: ErrorHandler = async (req, res, callback) => {
export const init: SpeckleModule['init'] = async (app) => {
await ensureConditions()
const createStreamWritePermissions = ({ projectDb }: { projectDb: Knex }) =>
const createStreamWritePermissions = () =>
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
getStream: getStreamFactory({ db })
})
const createStreamReadPermissions = ({ projectDb }: { projectDb: Knex }) =>
const createStreamReadPermissions = () =>
streamReadPermissionsPipelineFactory({
adminOverrideEnabled,
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
getStream: getStreamFactory({ db })
})
app.post(
'/api/stream/:streamId/blob',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamWritePermissions({ projectDb }),
...createStreamWritePermissions(),
// todo should we add public comments upload escape hatch?
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments
])(req, res, next)
@@ -243,9 +238,8 @@ export const init: SpeckleModule['init'] = async (app) => {
app.post(
'/api/stream/:streamId/blob/diff',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamReadPermissions({ projectDb }),
...createStreamReadPermissions(),
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
@@ -272,9 +266,8 @@ export const init: SpeckleModule['init'] = async (app) => {
app.get(
'/api/stream/:streamId/blob/:blobId',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator([
...createStreamReadPermissions({ projectDb }),
...createStreamReadPermissions(),
allowForAllRegisteredUsersOnPublicStreamsWithPublicComments,
allowForRegisteredUsersOnPublicStreamsEvenWithoutRole,
allowAnonymousUsersOnPublicStreams
@@ -307,12 +300,7 @@ export const init: SpeckleModule['init'] = async (app) => {
app.delete(
'/api/stream/:streamId/blob/:blobId',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(createStreamReadPermissions({ projectDb }))(
req,
res,
next
)
await authMiddlewareCreator(createStreamReadPermissions())(req, res, next)
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
@@ -335,12 +323,7 @@ export const init: SpeckleModule['init'] = async (app) => {
app.get(
'/api/stream/:streamId/blobs',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(createStreamReadPermissions({ projectDb }))(
req,
res,
next
)
await authMiddlewareCreator(createStreamReadPermissions())(req, res, next)
},
async (req, res) => {
let fileName = req.query.fileName
@@ -65,7 +65,6 @@ import {
getAutomationRevisionsFactory,
getAutomationRunsTriggersFactory,
getAutomationsFactory,
getFunctionAutomationCountsFactory,
getLatestAutomationRevisionsFactory,
getRevisionsFunctionsFactory,
getRevisionsTriggerDefinitionsFactory
@@ -110,7 +109,6 @@ const dataLoadersDefinition = defineRequestDataloaders(
const getLatestAutomationRevisions = getLatestAutomationRevisionsFactory({ db })
const getRevisionsTriggerDefinitions = getRevisionsTriggerDefinitionsFactory({ db })
const getRevisionsFunctions = getRevisionsFunctionsFactory({ db })
const getFunctionAutomationCounts = getFunctionAutomationCountsFactory({ db })
const getStreamCommentCounts = getStreamCommentCountsFactory({ db })
const getAutomationRunsTriggers = getAutomationRunsTriggersFactory({ db })
const getCommentsResources = getCommentsResourcesFactory({ db })
@@ -547,14 +545,6 @@ const dataLoadersDefinition = defineRequestDataloaders(
})
},
automations: {
getFunctionAutomationCount: createLoader<string, number>(
async (functionIds) => {
const results = await getFunctionAutomationCounts({
functionIds: functionIds.slice()
})
return functionIds.map((i) => results[i] || 0)
}
),
getAutomation: createLoader<string, Nullable<AutomationRecord>>(async (ids) => {
const results = keyBy(
await getAutomations({ automationIds: ids.slice() }),
@@ -236,7 +236,6 @@ export type AutomateAuthCodePayloadTest = {
export type AutomateFunction = {
__typename?: 'AutomateFunction';
automationCount: Scalars['Int']['output'];
/** Only returned if user is a part of this speckle server */
creator?: Maybe<LimitedUser>;
description: Scalars['String']['output'];
@@ -307,6 +306,7 @@ export type AutomateFunctionRun = {
export type AutomateFunctionRunStatusReportInput = {
contextView?: InputMaybe<Scalars['String']['input']>;
functionRunId: Scalars['String']['input'];
projectId: Scalars['String']['input'];
/** AutomateTypes.ResultsSchema type from @speckle/shared */
results?: InputMaybe<Scalars['JSONObject']['input']>;
status: AutomateRunStatus;
@@ -5230,7 +5230,6 @@ export type AuthStrategyResolvers<ContextType = GraphQLContext, ParentType exten
};
export type AutomateFunctionResolvers<ContextType = GraphQLContext, ParentType extends ResolversParentTypes['AutomateFunction'] = ResolversParentTypes['AutomateFunction']> = {
automationCount?: Resolver<ResolversTypes['Int'], ParentType, ContextType>;
creator?: Resolver<Maybe<ResolversTypes['LimitedUser']>, ParentType, ContextType>;
description?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
@@ -217,7 +217,6 @@ export type AutomateAuthCodePayloadTest = {
export type AutomateFunction = {
__typename?: 'AutomateFunction';
automationCount: Scalars['Int']['output'];
/** Only returned if user is a part of this speckle server */
creator?: Maybe<LimitedUser>;
description: Scalars['String']['output'];
@@ -288,6 +287,7 @@ export type AutomateFunctionRun = {
export type AutomateFunctionRunStatusReportInput = {
contextView?: InputMaybe<Scalars['String']['input']>;
functionRunId: Scalars['String']['input'];
projectId: Scalars['String']['input'];
/** AutomateTypes.ResultsSchema type from @speckle/shared */
results?: InputMaybe<Scalars['JSONObject']['input']>;
status: AutomateRunStatus;
+1 -4
View File
@@ -17,7 +17,6 @@ import { publish } from '@/modules/shared/utils/subscriptions'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { getAutomationProjectFactory } from '@/modules/automate/repositories/automations'
import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity'
@@ -37,12 +36,10 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => {
app.post(
'/api/file/:fileType/:streamId/:branchName?',
async (req, res, next) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
await authMiddlewareCreator(
streamWritePermissionsPipelineFactory({
getRoles: getRolesFactory({ db }),
getStream: getStreamFactory({ db }),
getAutomationProject: getAutomationProjectFactory({ db: projectDb })
getStream: getStreamFactory({ db })
})
)(req, res, next)
},
+7 -14
View File
@@ -19,7 +19,6 @@ import {
import { isResourceAllowed } from '@/modules/core/helpers/token'
import { UserRoleData } from '@/modules/shared/domain/rolesAndScopes/types'
import db from '@/db/knex'
import { GetAutomationProject } from '@/modules/automate/domain/operations'
import {
AuthContext,
AuthParams,
@@ -209,7 +208,6 @@ type StreamGetter = (params: {
type ValidateRequiredStreamDeps = {
getStream: StreamGetter
getAutomationProject: GetAutomationProject
}
// this doesn't do any checks on the scopes, its sole responsibility is to add the
@@ -219,12 +217,12 @@ export const validateRequiredStreamFactory =
// stream getter is an async func over { streamId, userId } returning a stream object
// IoC baby...
async ({ context, authResult, params }) => {
const { getStream, getAutomationProject } = deps
const { getStream } = deps
if (!params?.streamId && !params?.automationId)
if (!params?.streamId)
return authFailed(
context,
new ContextError("The context doesn't have a streamId or automationId")
new ContextError("The context doesn't have a streamId")
)
// because we're assigning to the context, it would raise if it would be null
// its probably?? safer than returning a new context
@@ -234,15 +232,10 @@ export const validateRequiredStreamFactory =
// cause stream getter could throw, its not a safe function if we want to
// keep the pipeline rolling
try {
const stream = params.streamId
? await getStream({
streamId: params.streamId,
userId: context?.userId
})
: await getAutomationProject({
automationId: params.automationId!,
userId: context?.userId
})
const stream = await getStream({
streamId: params.streamId,
userId: context?.userId
})
if (!stream)
return authFailed(
@@ -27,7 +27,6 @@ export interface AuthResult {
export interface AuthParams {
streamId?: string
automationId?: string
}
export interface AuthData {
@@ -316,7 +316,7 @@ describe('AuthZ @shared', () => {
})
const { authResult } = await step({ params: {} })
expectAuthError(
new ContextError("The context doesn't have a streamId or automationId"),
new ContextError("The context doesn't have a streamId"),
authResult
)
})
@@ -327,7 +327,7 @@ describe('AuthZ @shared', () => {
})
const { authResult } = await step({})
expectAuthError(
new ContextError("The context doesn't have a streamId or automationId"),
new ContextError("The context doesn't have a streamId"),
authResult
)
})
-1
View File
@@ -22,7 +22,6 @@ export const automateFunctionFragment = gql`
commitId
}
}
automationCount
supportedSourceApps
tags
}
File diff suppressed because one or more lines are too long
+1
View File
@@ -99,6 +99,7 @@
"GENDOAI",
"Insertable",
"mjml",
"multiregion",
"OIDC",
"Prorotation"
],