diff --git a/packages/server/modules/acc/domain/operations.ts b/packages/server/modules/acc/domain/operations.ts index c9de3484a..d3ba51e55 100644 --- a/packages/server/modules/acc/domain/operations.ts +++ b/packages/server/modules/acc/domain/operations.ts @@ -1,10 +1,15 @@ -import type { AccSyncItem } from '@/modules/acc/domain/types' +import type { AccSyncItem, AccSyncItemStatus } from '@/modules/acc/domain/types' export type UpsertAccSyncItem = (item: AccSyncItem) => Promise +export type UpdateAccSyncItemStatus = (args: { + id: string + status: AccSyncItemStatus +}) => Promise + export type GetAccSyncItemByUrn = (args: { lineageUrn: string -}) => Promise +}) => Promise export type ListAccSyncItems = (args: { projectId: string diff --git a/packages/server/modules/acc/errors/acc.ts b/packages/server/modules/acc/errors/acc.ts index 99ded8a85..e49265a00 100644 --- a/packages/server/modules/acc/errors/acc.ts +++ b/packages/server/modules/acc/errors/acc.ts @@ -15,3 +15,8 @@ export class SyncItemNotFoundError extends BaseError { static defaultMessage = 'Sync item not found' static code = 'ACC_SYNC_ITEM_NOT_FOUND' } + +export class SyncItemAutomationTriggerError extends BaseError { + static defaultMessage = 'Failed to trigger automation associated with sync item' + static code = 'ACC_SYNC_ITEM_AUTOMATION_TRIGGER_ERROR' +} diff --git a/packages/server/modules/acc/graph/resolvers/accSyncItems.ts b/packages/server/modules/acc/graph/resolvers/accSyncItems.ts index 84b66eff3..495137630 100644 --- a/packages/server/modules/acc/graph/resolvers/accSyncItems.ts +++ b/packages/server/modules/acc/graph/resolvers/accSyncItems.ts @@ -3,6 +3,7 @@ import { deleteAccSyncItemByUrnFactory, getAccSyncItemByUrnFactory, listAccSyncItemsFactory, + updateAccSyncItemStatusFactory, upsertAccSyncItemFactory } from '@/modules/acc/repositories/accSyncItems' import { @@ -19,9 +20,12 @@ import { } from '@/modules/automate/clients/executionEngine' import { getAutomationFactory, + getAutomationTokenFactory, + getLatestAutomationRevisionFactory, storeAutomationFactory, storeAutomationRevisionFactory, - storeAutomationTokenFactory + storeAutomationTokenFactory, + upsertAutomationRunFactory } from '@/modules/automate/repositories/automations' import { createStoredAuthCodeFactory } from '@/modules/automate/services/authCode' import { @@ -44,6 +48,14 @@ import { getGenericRedis } from '@/modules/shared/redis/redis' import { getEventBus } from '@/modules/shared/services/eventBus' import { buildDecryptor } from '@/modules/shared/utils/libsodium' import { db } from '@/db/knex' +import { triggerSyncItemAutomationFactory } from '@/modules/acc/services/automate' +import { createAppTokenFactory } from '@/modules/core/services/tokens' +import { + storeApiTokenFactory, + storeTokenScopesFactory, + storeTokenResourceAccessDefinitionsFactory, + storeUserServerAppTokenFactory +} from '@/modules/core/repositories/tokens' const resolvers: Resolvers = { Mutation: { @@ -84,6 +96,26 @@ const resolvers: Resolvers = { eventEmit: getEventBus().emit, validateStreamAccess: validateStreamAccessFactory({ authorizeResolver }) }), + triggerSyncItemAutomation: triggerSyncItemAutomationFactory({ + updateAccSyncItemStatus: updateAccSyncItemStatusFactory({ db }), + getAutomation: getAutomationFactory({ db: projectDb }), + getLatestAutomationRevision: getLatestAutomationRevisionFactory({ + db: projectDb + }), + upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }), + createAppToken: createAppTokenFactory({ + storeApiToken: storeApiTokenFactory({ db }), + storeTokenScopes: storeTokenScopesFactory({ db }), + storeTokenResourceAccessDefinitions: + storeTokenResourceAccessDefinitionsFactory({ + db + }), + storeUserServerAppToken: storeUserServerAppTokenFactory({ db }) + }), + getAutomationToken: getAutomationTokenFactory({ + db: projectDb + }) + }), eventEmit: getEventBus().emit })({ syncItem: input, diff --git a/packages/server/modules/acc/repositories/accSyncItems.ts b/packages/server/modules/acc/repositories/accSyncItems.ts index 6ff77afa5..cc05042e5 100644 --- a/packages/server/modules/acc/repositories/accSyncItems.ts +++ b/packages/server/modules/acc/repositories/accSyncItems.ts @@ -5,6 +5,7 @@ import type { GetAccSyncItemByUrn, ListAccSyncItems, QueryAllAccSyncItems, + UpdateAccSyncItemStatus, UpsertAccSyncItem } from '@/modules/acc/domain/operations' import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper' @@ -19,11 +20,13 @@ const tables = { export const getAccSyncItemByUrnFactory = (deps: { db: Knex }): GetAccSyncItemByUrn => async ({ lineageUrn }) => { - return await tables - .accSyncItems(deps.db) - .select('*') - .where(AccSyncItems.col.accFileLineageUrn, lineageUrn) - .first() + return ( + (await tables + .accSyncItems(deps.db) + .select('*') + .where(AccSyncItems.col.accFileLineageUrn, lineageUrn) + .first()) ?? null + ) } export const upsertAccSyncItemFactory = @@ -41,6 +44,22 @@ export const upsertAccSyncItemFactory = ) } +export const updateAccSyncItemStatusFactory = + (deps: { db: Knex }): UpdateAccSyncItemStatus => + async ({ id, status }) => { + return ( + ( + await tables + .accSyncItems(deps.db) + .update({ + [AccSyncItems.col.status]: status + }) + .where(AccSyncItems.col.id, id) + .returning('*') + ).at(0) ?? null + ) + } + export const deleteAccSyncItemByUrnFactory = (deps: { db: Knex }): DeleteAccSyncItemByUrn => async ({ lineageUrn }) => { diff --git a/packages/server/modules/acc/services/automate.ts b/packages/server/modules/acc/services/automate.ts new file mode 100644 index 000000000..e2d9b1fb8 --- /dev/null +++ b/packages/server/modules/acc/services/automate.ts @@ -0,0 +1,163 @@ +import { ImporterAutomateFunctions } from '@/modules/acc/domain/constants' +import type { UpdateAccSyncItemStatus } from '@/modules/acc/domain/operations' +import type { AccSyncItem } from '@/modules/acc/domain/types' +import { + SyncItemAutomationTriggerError, + SyncItemNotFoundError +} from '@/modules/acc/errors/acc' +import { DefaultAppIds } from '@/modules/auth/defaultApps' +import { triggerAutomationRun } from '@/modules/automate/clients/executionEngine' +import type { + GetAutomation, + GetAutomationToken, + GetLatestAutomationRevision, + UpsertAutomationRun +} from '@/modules/automate/domain/operations' +import type { VersionCreatedTriggerManifest } from '@/modules/automate/helpers/types' +import type { InsertableAutomationRun } from '@/modules/automate/repositories/automations' +import type { CreateAndStoreAppToken } from '@/modules/core/domain/tokens/operations' +import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/graphql' +import { + getAutodeskIntegrationClientId, + getAutodeskIntegrationClientSecret +} from '@/modules/shared/helpers/envHelper' +import { logger } from '@/observability/logging' +import { Scopes } from '@speckle/shared' +import cryptoRandomString from 'crypto-random-string' + +export type TriggerSyncItemAutomation = (args: { id: string }) => Promise + +export const triggerSyncItemAutomationFactory = + (deps: { + updateAccSyncItemStatus: UpdateAccSyncItemStatus + getAutomation: GetAutomation + getLatestAutomationRevision: GetLatestAutomationRevision + upsertAutomationRun: UpsertAutomationRun + getAutomationToken: GetAutomationToken + createAppToken: CreateAndStoreAppToken + }): TriggerSyncItemAutomation => + async ({ id }) => { + const syncItem = await deps.updateAccSyncItemStatus({ + id, + status: 'SYNCING' + }) + + if (!syncItem) { + throw new SyncItemNotFoundError() + } + + const automation = await deps.getAutomation({ + automationId: syncItem.automationId + }) + + if (!automation || !automation.executionEngineAutomationId) { + logger.error( + { syncItem }, + 'Could not find automation {syncItem.automationId} configured for sync item {syncItem.id}' + ) + throw new SyncItemAutomationTriggerError() + } + + const automationRevision = await deps.getLatestAutomationRevision({ + automationId: syncItem.automationId + }) + + if (!automationRevision) { + logger.error( + { syncItem }, + 'Could not find latest revision for automation {syncItem.automationId} configured for sync item {syncItem.id}' + ) + throw new SyncItemAutomationTriggerError() + } + + const runId = cryptoRandomString({ length: 15 }) + + const runData: InsertableAutomationRun = { + id: runId, + automationRevisionId: automationRevision.id, + createdAt: new Date(), + updatedAt: new Date(), + status: 'pending', + executionEngineRunId: null, + triggers: [ + { + // TODO ACC: This is not meaningful until we integrate with fileUpload + triggeringId: '', + triggerType: 'versionCreation' + } + ], + functionRuns: [ + { + id: cryptoRandomString({ length: 15 }), + functionId: ImporterAutomateFunctions.svf2.functionId, + functionReleaseId: ImporterAutomateFunctions.svf2.functionId, + status: 'pending' as const, + elapsed: 0, + results: null, + contextView: null, + statusMessage: null, + createdAt: new Date(), + updatedAt: new Date() + } + ] + } + + await deps.upsertAutomationRun(runData) + + const projectScopedToken = await deps.createAppToken({ + appId: DefaultAppIds.Automate, + name: `acct-${syncItem.id}`, + userId: syncItem.authorId, + scopes: [ + Scopes.Profile.Read, + Scopes.Streams.Read, + Scopes.Streams.Write, + Scopes.Automate.ReportResults + ], + limitResources: [ + { + id: syncItem.projectId, + type: TokenResourceIdentifierType.Project + } + ] + }) + + const automationToken = await deps.getAutomationToken(syncItem.automationId) + + if (!automationToken) { + logger.error( + { syncItem }, + 'Could not find automate token for automation {syncItem.automationId} configured for sync item {syncItem.id}' + ) + throw new SyncItemAutomationTriggerError() + } + + await triggerAutomationRun({ + projectId: syncItem.projectId, + automationId: automation.executionEngineAutomationId, + functionRuns: runData.functionRuns.map((r) => ({ + ...r, + runId: cryptoRandomString({ length: 15 }), + resultVersions: [], + functionInputs: { + projectId: syncItem.projectId, + modelId: syncItem.modelId, + autodeskUrn: syncItem.accFileVersionUrn, + autodeskRegion: syncItem.accRegion === 'EMEA' ? 1 : 0, + autodeskClientId: getAutodeskIntegrationClientId(), + autodeskClientSecret: getAutodeskIntegrationClientSecret() + } + })), + manifests: [ + { + triggerType: 'versionCreation', + modelId: syncItem.modelId, + versionId: cryptoRandomString({ length: 9 }) + } as VersionCreatedTriggerManifest + ], + speckleToken: projectScopedToken, + automationToken: automationToken.automateToken + }) + + return syncItem + } diff --git a/packages/server/modules/acc/services/cron.ts b/packages/server/modules/acc/services/cron.ts index 4113f4c2c..a0327d76c 100644 --- a/packages/server/modules/acc/services/cron.ts +++ b/packages/server/modules/acc/services/cron.ts @@ -1,6 +1,6 @@ import { queryAllPendingAccSyncItemsFactory, - upsertAccSyncItemFactory + updateAccSyncItemStatusFactory } from '@/modules/acc/repositories/accSyncItems' import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' import { db } from '@/db/knex' @@ -8,18 +8,12 @@ import { getManifestByUrn, getToken } from '@/modules/acc/clients/autodesk' import { isReadyForImport } from '@/modules/acc/domain/logic' import type { Logger } from '@/observability/logging' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' -import { ImporterAutomateFunctions } from '@/modules/acc/domain/constants' -import { DefaultAppIds } from '@/modules/auth/defaultApps' -import { triggerAutomationRun } from '@/modules/automate/clients/executionEngine' -import type { VersionCreatedTriggerManifest } from '@/modules/automate/helpers/types' -import type { InsertableAutomationRun } from '@/modules/automate/repositories/automations' import { getAutomationFactory, getLatestAutomationRevisionFactory, upsertAutomationRunFactory, getAutomationTokenFactory } from '@/modules/automate/repositories/automations' -import { TokenResourceIdentifierType } from '@/modules/core/graph/generated/graphql' import { storeApiTokenFactory, storeTokenScopesFactory, @@ -27,16 +21,11 @@ import { storeUserServerAppTokenFactory } from '@/modules/core/repositories/tokens' import { createAppTokenFactory } from '@/modules/core/services/tokens' -import { - getAutodeskIntegrationClientId, - getAutodeskIntegrationClientSecret -} from '@/modules/shared/helpers/envHelper' -import { Scopes, TIME_MS } from '@speckle/shared' -import cryptoRandomString from 'crypto-random-string' +import { TIME_MS } from '@speckle/shared' import type { AccRegion } from '@/modules/acc/domain/types' +import { triggerSyncItemAutomationFactory } from '@/modules/acc/services/automate' const queryAllPendingAccSyncItems = queryAllPendingAccSyncItemsFactory({ db }) -const upsertAccSyncItem = upsertAccSyncItemFactory({ db }) export const schedulePendingSyncItemsCheck = (deps: { scheduleExecution: ScheduleExecution @@ -61,116 +50,27 @@ export const schedulePendingSyncItemsCheck = (deps: { if (!isReady) continue - await upsertAccSyncItem({ - ...syncItem, - status: 'SYNCING' - }) - const projectDb = await getProjectDbClient({ projectId: syncItem.projectId }) - const automation = await getAutomationFactory({ db: projectDb })({ - automationId: syncItem.automationId - }) - - if (!automation || !automation.executionEngineAutomationId) continue - - const automationRevision = await getLatestAutomationRevisionFactory({ - db: projectDb - })({ automationId: syncItem.automationId }) - - if (!automationRevision) continue - - const runId = cryptoRandomString({ length: 15 }) - - const runData: InsertableAutomationRun = { - id: runId, - automationRevisionId: automationRevision.id, - createdAt: new Date(), - updatedAt: new Date(), - status: 'pending', - executionEngineRunId: null, - triggers: [ - { - // TODO ACC: This is not meaningful until we integrate with fileUpload - triggeringId: '', - triggerType: 'versionCreation' - } - ], - functionRuns: [ - { - id: cryptoRandomString({ length: 15 }), - functionId: ImporterAutomateFunctions.svf2.functionId, - functionReleaseId: ImporterAutomateFunctions.svf2.functionId, - status: 'pending' as const, - elapsed: 0, - results: null, - contextView: null, - statusMessage: null, - createdAt: new Date(), - updatedAt: new Date() - } - ] - } - - await upsertAutomationRunFactory({ db: projectDb })(runData) - - const projectScopedToken = await createAppTokenFactory({ - storeApiToken: storeApiTokenFactory({ db }), - storeTokenScopes: storeTokenScopesFactory({ db }), - storeTokenResourceAccessDefinitions: - storeTokenResourceAccessDefinitionsFactory({ - db - }), - storeUserServerAppToken: storeUserServerAppTokenFactory({ db }) - })({ - appId: DefaultAppIds.Automate, - name: `acct-${syncItem.id}`, - userId: syncItem.authorId, - scopes: [ - Scopes.Profile.Read, - Scopes.Streams.Read, - Scopes.Streams.Write, - Scopes.Automate.ReportResults - ], - limitResources: [ - { - id: syncItem.projectId, - type: TokenResourceIdentifierType.Project - } - ] - }) - - const automationToken = await getAutomationTokenFactory({ db: projectDb })( - syncItem.automationId - ) - - if (!automationToken) continue - - await triggerAutomationRun({ - projectId: syncItem.projectId, - automationId: automation.executionEngineAutomationId, - functionRuns: runData.functionRuns.map((r) => ({ - ...r, - runId: cryptoRandomString({ length: 15 }), - resultVersions: [], - functionInputs: { - projectId: syncItem.projectId, - modelId: syncItem.modelId, - autodeskUrn: syncItem.accFileVersionUrn, - autodeskRegion: syncItem.accRegion === 'EMEA' ? 1 : 0, - autodeskClientId: getAutodeskIntegrationClientId(), - autodeskClientSecret: getAutodeskIntegrationClientSecret() - } - })), - manifests: [ - { - triggerType: 'versionCreation', - modelId: syncItem.modelId, - versionId: cryptoRandomString({ length: 9 }) - } as VersionCreatedTriggerManifest - ], - speckleToken: projectScopedToken, - automationToken: automationToken.automateToken + await triggerSyncItemAutomationFactory({ + updateAccSyncItemStatus: updateAccSyncItemStatusFactory({ db }), + getAutomation: getAutomationFactory({ db: projectDb }), + getLatestAutomationRevision: getLatestAutomationRevisionFactory({ + db: projectDb + }), + upsertAutomationRun: upsertAutomationRunFactory({ db: projectDb }), + createAppToken: createAppTokenFactory({ + storeApiToken: storeApiTokenFactory({ db }), + storeTokenScopes: storeTokenScopesFactory({ db }), + storeTokenResourceAccessDefinitions: + storeTokenResourceAccessDefinitionsFactory({ + db + }), + storeUserServerAppToken: storeUserServerAppTokenFactory({ db }) + }), + getAutomationToken: getAutomationTokenFactory({ + db: projectDb + }) }) } } diff --git a/packages/server/modules/acc/services/management.ts b/packages/server/modules/acc/services/management.ts index c8a5a1420..87e04ac18 100644 --- a/packages/server/modules/acc/services/management.ts +++ b/packages/server/modules/acc/services/management.ts @@ -1,6 +1,11 @@ -import { tryRegisterAccWebhook } from '@/modules/acc/clients/autodesk' +import { + getManifestByUrn, + getToken, + tryRegisterAccWebhook +} from '@/modules/acc/clients/autodesk' import { ImporterAutomateFunctions } from '@/modules/acc/domain/constants' import { AccSyncItemEvents } from '@/modules/acc/domain/events' +import { isReadyForImport } from '@/modules/acc/domain/logic' import type { CountAccSyncItems, DeleteAccSyncItemByUrn, @@ -10,6 +15,7 @@ import type { } from '@/modules/acc/domain/operations' import type { AccRegion, AccSyncItem } from '@/modules/acc/domain/types' import { DuplicateSyncItemError, SyncItemNotFoundError } from '@/modules/acc/errors/acc' +import type { TriggerSyncItemAutomation } from '@/modules/acc/services/automate' import type { CreateAutomation, CreateAutomationRevision @@ -46,6 +52,7 @@ export const createAccSyncItemFactory = upsertAccSyncItem: UpsertAccSyncItem createAutomation: CreateAutomation createAutomationRevision: CreateAutomationRevision + triggerSyncItemAutomation: TriggerSyncItemAutomation eventEmit: EventBusEmit }): CreateAccSyncItem => async ({ syncItem, creatorUserId }) => { @@ -121,7 +128,20 @@ export const createAccSyncItemFactory = } }) - return newSyncItem + // Import new sync item immediately, if possible + const tokenData = await getToken() + const manifest = await getManifestByUrn( + { + urn: newSyncItem.accFileVersionUrn, + region: newSyncItem.accRegion as AccRegion + }, + { token: tokenData.access_token } + ) + const isReady = isReadyForImport(manifest) + + if (!isReady) return newSyncItem + + return await deps.triggerSyncItemAutomation({ id: newSyncItem.id }) } export type GetAccSyncItem = (params: { lineageUrn: string }) => Promise