From 04f84c31f36da3d8eb828c7262a040e4c5ecee2e Mon Sep 17 00:00:00 2001 From: Iain Sproat <68657+iainsproat@users.noreply.github.com> Date: Fri, 28 Feb 2025 18:22:00 +0000 Subject: [PATCH] fix(server/automate): logger should have request context and request ID sent to execution engine (#4092) * fix(server/automate): logger should have request context * WIP - pass in request Id * WIP * fix(automate): better logging for automate processes * chore(automate): slight log improvement * fix(automate): just in case --------- Co-authored-by: Charles Driesler --- .../automate/clients/executionEngine.ts | 350 ++++++++++-------- .../automate/graph/resolvers/automate.ts | 42 ++- packages/server/modules/automate/index.ts | 4 +- .../modules/automate/services/authCode.ts | 16 +- .../automate/services/automationManagement.ts | 12 +- .../automate/services/functionManagement.ts | 11 +- .../modules/core/graph/dataloaders/index.ts | 9 +- .../workspaces/graph/resolvers/workspaces.ts | 6 +- 8 files changed, 255 insertions(+), 195 deletions(-) diff --git a/packages/server/modules/automate/clients/executionEngine.ts b/packages/server/modules/automate/clients/executionEngine.ts index d29219d2a..3f0349894 100644 --- a/packages/server/modules/automate/clients/executionEngine.ts +++ b/packages/server/modules/automate/clients/executionEngine.ts @@ -28,6 +28,8 @@ import { retry, timeoutAt } from '@speckle/shared' +import { randomUUID } from 'crypto' +import { Logger } from 'pino' import { has, isObjectLike, isEmpty } from 'lodash' export type AuthCodePayloadWithOrigin = AuthCodePayload & { origin: string } @@ -75,22 +77,23 @@ const getApiUrl = ( return url.toString() } -const invokeSafeJsonRequest = async < - Response extends Record = Record ->( - ...args: Parameters -): Promise => { - const [{ url, method }] = args - try { - return await invokeJsonRequest(...args) - } catch (e) { - automateLogger.error( - { url, method, err: e }, - 'Automate API request error suppressed.' - ) - return null +const invokeSafeJsonRequestFactory = + = Record>(deps: { + logger: Logger + }) => + async (...args: Parameters): Promise => { + const { logger } = deps + const [{ url, method }] = args + try { + return await invokeJsonRequest({ + ...args[0], + requestId: logger.bindings?.()?.req?.id + }) + } catch (e) { + logger.error({ url, method, err: e }, 'Automate API request error suppressed.') + return null + } } -} const invokeJsonRequest = async >( ...args: Parameters @@ -110,10 +113,11 @@ const invokeRequest = async (params: { url: string method?: RequestInit['method'] body?: Record + requestId?: string token?: string retry?: boolean }) => { - const { url, method = 'get', body, token } = params + const { url, method = 'get', body, token, requestId } = params const response = await retry( async () => @@ -122,6 +126,7 @@ const invokeRequest = async (params: { method, headers: { 'Content-Type': 'application/json', + 'X-Request-Id': requestId ?? randomUUID(), ...(token?.length ? { Authorization: `Bearer ${token}` } : {}) }, body: body && isObjectLike(body) ? JSON.stringify(body) : undefined @@ -388,78 +393,91 @@ export type GetFunctionResponse = FunctionWithVersionsSchemaType & { versionCursor: Nullable } -export const getFunction = async (params: { - functionId: string - token?: string - releases?: { cursor?: string; limit?: number; versionsFilter?: string } -}) => { - const { functionId, token } = params - const query = Object.values(params.releases || {}).filter(isNonNullable).length - ? params.releases - : undefined +export const getFunctionFactory = + (deps: { logger: Logger }) => + async (params: { + functionId: string + token?: string + releases?: { cursor?: string; limit?: number; versionsFilter?: string } + }) => { + const { logger } = deps + const { functionId, token } = params + const query = Object.values(params.releases || {}).filter(isNonNullable).length + ? params.releases + : undefined - const url = getApiUrl(`/api/v1/functions/${functionId}`, { - query - }) + const url = getApiUrl(`/api/v1/functions/${functionId}`, { + query + }) - return await invokeSafeJsonRequest({ - url, - method: 'get', - token - }) -} + return await invokeSafeJsonRequestFactory({ + logger + })({ + url, + method: 'get', + token + }) + } export type GetFunctionReleaseResponse = FunctionReleaseSchemaType /** * TODO: Build optimized exec engine endpoint for this */ -export const getFunctionReleases = async (params: { - ids: Array<{ functionId: string; functionReleaseId: string }> -}) => { - const { ids } = params - const results = await Promise.all( - ids.map(async ({ functionId, functionReleaseId }) => { - try { - return await getFunctionRelease({ functionId, functionReleaseId }) - } catch (e) { - if (e instanceof ExecutionEngineNetworkError) { - return null - } - if ( - e instanceof ExecutionEngineFailedResponseError && - e.response.statusMessage === 'FunctionNotFound' - ) { - return null - } +export const getFunctionReleasesFactory = + (deps: { logger: Logger }) => + async (params: { ids: Array<{ functionId: string; functionReleaseId: string }> }) => { + const { logger } = deps + const { ids } = params + const results = await Promise.all( + ids.map(async ({ functionId, functionReleaseId }) => { + try { + return await getFunctionReleaseFactory({ logger })({ + functionId, + functionReleaseId + }) + } catch (e) { + if (e instanceof ExecutionEngineNetworkError) { + return null + } + if ( + e instanceof ExecutionEngineFailedResponseError && + e.response.statusMessage === 'FunctionNotFound' + ) { + return null + } - throw e - } + throw e + } + }) + ) + + return results.filter(isNonNullable) + } + +export const getFunctionReleaseFactory = + (deps: { logger: Logger }) => + async (params: { functionId: string; functionReleaseId: string }) => { + const { logger } = deps + const { functionId, functionReleaseId } = params + const url = getApiUrl( + `/api/v1/functions/${functionId}/versions/${functionReleaseId}` + ) + + const result = await invokeSafeJsonRequestFactory({ + logger + })({ + url, + method: 'get' }) - ) - return results.filter(isNonNullable) -} - -export const getFunctionRelease = async (params: { - functionId: string - functionReleaseId: string -}) => { - const { functionId, functionReleaseId } = params - const url = getApiUrl(`/api/v1/functions/${functionId}/versions/${functionReleaseId}`) - - const result = await invokeSafeJsonRequest({ - url, - method: 'get' - }) - - return result - ? { - ...result, - functionId - } - : null -} + return result + ? { + ...result, + functionId + } + : null + } export type GetFunctionsParams = { auth?: AuthCodePayload @@ -480,29 +498,32 @@ export type GetFunctionsResponse = { totalCount: number } -export const getFunctions = async (params: GetFunctionsParams) => { - const url = getApiUrl(`/api/v2/functions`, { - query: { - requireRelease: true, - ...params.filters - } - }) +export const getFunctionsFactory = + (deps: { logger: Logger }) => async (params: GetFunctionsParams) => { + const { logger } = deps - const authToken = params.auth - ? Buffer.from( - JSON.stringify({ - ...params.auth, - origin: getServerOrigin() - }) - ).toString('base64') - : undefined + const url = getApiUrl(`/api/v2/functions`, { + query: { + requireRelease: true, + ...params.filters + } + }) - return await invokeSafeJsonRequest({ - url, - method: 'get', - token: authToken - }) -} + const authToken = params.auth + ? Buffer.from( + JSON.stringify({ + ...params.auth, + origin: getServerOrigin() + }) + ).toString('base64') + : undefined + + return await invokeSafeJsonRequestFactory({ logger })({ + url, + method: 'get', + token: authToken + }) + } export type GetPublicFunctionsResponse = { totalCount: number @@ -510,79 +531,94 @@ export type GetPublicFunctionsResponse = { items: FunctionWithVersionsSchemaType[] } -export const getPublicFunctions = async (params: { - query?: { - query?: string - cursor?: string - limit?: number - functionsWithoutVersions?: boolean - } -}) => { - const { query } = params - const url = getApiUrl(`/api/v1/functions`, { - query: { - ...query, - featuredFunctionsOnly: true +export const getPublicFunctionsFactory = + (deps: { logger: Logger }) => + async (params: { + query?: { + query?: string + cursor?: string + limit?: number + functionsWithoutVersions?: boolean } - }) + }) => { + const { logger } = deps + const { query } = params + const url = getApiUrl(`/api/v1/functions`, { + query: { + ...query, + featuredFunctionsOnly: true + } + }) - return await invokeSafeJsonRequest({ - url, - method: 'get' - }) -} + return await invokeSafeJsonRequestFactory({ + logger + })({ + url, + method: 'get' + }) + } type GetUserFunctionsResponse = { functions: FunctionWithVersionsSchemaType[] } -export const getUserFunctions = async (params: { - userId: string - query?: { - query?: string - cursor?: string - limit?: number - } - body: { - speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin - } -}) => { - const { userId, query, body } = params - const url = getApiUrl(`/api/v2/users/${userId}/functions`, { query }) +export const getUserFunctionsFactory = + (deps: { logger: Logger }) => + async (params: { + userId: string + query?: { + query?: string + cursor?: string + limit?: number + } + body: { + speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin + } + }) => { + const { logger } = deps + const { userId, query, body } = params + const url = getApiUrl(`/api/v2/users/${userId}/functions`, { query }) - return await invokeSafeJsonRequest({ - url, - method: 'POST', - body, - retry: false - }) -} + return await invokeSafeJsonRequestFactory({ + logger + })({ + url, + method: 'POST', + body, + retry: false + }) + } type GetWorkspaceFunctionsResponse = { functions: FunctionWithVersionsSchemaType[] } -export const getWorkspaceFunctions = async (params: { - workspaceId: string - query?: { - query?: string - cursor?: string - limit?: number - } - body: { - speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin - } -}) => { - const { workspaceId, query, body } = params - const url = getApiUrl(`/api/v2/workspaces/${workspaceId}/functions`, { query }) +export const getWorkspaceFunctionsFactory = + (deps: { logger: Logger }) => + async (params: { + workspaceId: string + query?: { + query?: string + cursor?: string + limit?: number + } + body: { + speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin + } + }) => { + const { logger } = deps + const { workspaceId, query, body } = params + const url = getApiUrl(`/api/v2/workspaces/${workspaceId}/functions`, { query }) - return await invokeSafeJsonRequest({ - url, - method: 'POST', - body, - retry: false - }) -} + return await invokeSafeJsonRequestFactory({ + logger + })({ + url, + method: 'POST', + body, + retry: false + }) + } type UserGithubAuthStateResponse = { userHasAuthorizedGitHubApp: boolean diff --git a/packages/server/modules/automate/graph/resolvers/automate.ts b/packages/server/modules/automate/graph/resolvers/automate.ts index 0eec4ba48..ea9fb53b6 100644 --- a/packages/server/modules/automate/graph/resolvers/automate.ts +++ b/packages/server/modules/automate/graph/resolvers/automate.ts @@ -3,13 +3,13 @@ import { createFunctionWithoutVersion, triggerAutomationRun, updateFunction as execEngineUpdateFunction, - getFunction, - getFunctionRelease, - getPublicFunctions, - getFunctionReleases, + getFunctionFactory, + getFunctionReleaseFactory, + getPublicFunctionsFactory, + getFunctionReleasesFactory, getUserGithubAuthState, getUserGithubOrganizations, - getUserFunctions + getUserFunctionsFactory } from '@/modules/automate/clients/executionEngine' import { GetProjectAutomationsParams, @@ -459,10 +459,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED } }, AutomateFunction: { - async releases(parent, args) { + async releases(parent, args, context) { try { // TODO: Replace w/ dataloader batch call, when/if possible - const fn = await getFunction({ + const fn = await getFunctionFactory({ + logger: context.log + })({ functionId: parent.id, releases: args?.cursor || args?.filter?.search || args?.limit @@ -567,10 +569,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED async updateFunction(_parent, args, ctx) { const update = updateFunctionFactory({ updateFunction: execEngineUpdateFunction, - getFunction, + getFunction: getFunctionFactory({ logger: ctx.log }), createStoredAuthCode: createStoredAuthCodeFactory({ redis: getGenericRedis() - }) + }), + logger: ctx.log }) return await update({ input: args.input, userId: ctx.userId! }) } @@ -621,12 +624,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED getAutomation: getAutomationFactory({ db: projectDb }), storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }), getBranchesByIds: getBranchesByIdsFactory({ db: projectDb }), - getFunctionRelease, + getFunctionRelease: getFunctionReleaseFactory({ logger: ctx.log }), getEncryptionKeyPair, getFunctionInputDecryptor: getFunctionInputDecryptorFactory({ buildDecryptor }), - getFunctionReleases, + getFunctionReleases: getFunctionReleasesFactory({ logger: ctx.log }), eventEmit: getEventBus().emit, validateStreamAccess }) @@ -679,7 +682,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED const create = createTestAutomationFactory({ getEncryptionKeyPair, - getFunction, + getFunction: getFunctionFactory({ logger: ctx.log }), storeAutomation: storeAutomationFactory({ db: projectDb }), storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }), validateStreamAccess, @@ -730,10 +733,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED } }, Query: { - async automateValidateAuthCode(_parent, args) { + async automateValidateAuthCode(_parent, args, ctx) { const validate = validateStoredAuthCodeFactory({ redis: getGenericRedis(), - emit: getEventBus().emit + emit: getEventBus().emit, + logger: ctx.log }) const payload = removeNullOrUndefinedKeys(args.payload) const resources = removeNullOrUndefinedKeys(args.resources ?? {}) @@ -755,9 +759,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED return convertFunctionToGraphQLReturn(fn) }, - async automateFunctions(_parent, args) { + async automateFunctions(_parent, args, ctx) { try { - const res = await getPublicFunctions({ + const res = await getPublicFunctionsFactory({ + logger: ctx.log + })({ query: { query: args.filter?.search || undefined, cursor: args.cursor || undefined, @@ -809,7 +815,9 @@ export = (FF_AUTOMATE_MODULE_ENABLED action: AuthCodePayloadAction.ListUserFunctions }) - const res = await getUserFunctions({ + const res = await getUserFunctionsFactory({ + logger: context.log + })({ userId: context.userId!, query: { query: args.filter?.search || undefined, diff --git a/packages/server/modules/automate/index.ts b/packages/server/modules/automate/index.ts index 5ed5c1bb9..0ce22f83a 100644 --- a/packages/server/modules/automate/index.ts +++ b/packages/server/modules/automate/index.ts @@ -17,7 +17,7 @@ import { import { isNonNullable, Scopes } from '@speckle/shared' import { registerOrUpdateScopeFactory } from '@/modules/shared/repositories/scopes' import { - getFunction, + getFunctionFactory, triggerAutomationRun } from '@/modules/automate/clients/executionEngine' import logStreamRest from '@/modules/automate/rest/logStream' @@ -289,7 +289,7 @@ const initializeEventListeners = () => { const fn = isTestEnv() ? null - : await getFunction({ functionId: functionRun.functionId }) + : await getFunctionFactory({ logger })({ functionId: functionRun.functionId }) const userEmail = await getUserEmailFromAutomationRunFactory({ getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory({ diff --git a/packages/server/modules/automate/services/authCode.ts b/packages/server/modules/automate/services/authCode.ts index 8c89bc783..cab3950bd 100644 --- a/packages/server/modules/automate/services/authCode.ts +++ b/packages/server/modules/automate/services/authCode.ts @@ -5,6 +5,7 @@ import { EventBus } from '@/modules/shared/services/eventBus' import cryptoRandomString from 'crypto-random-string' import Redis from 'ioredis' import { get, has, isObjectLike } from 'lodash' +import { Logger } from 'pino' export enum AuthCodePayloadAction { CreateAutomation = 'createAutomation', @@ -47,14 +48,14 @@ export const createStoredAuthCodeFactory = } export const validateStoredAuthCodeFactory = - (deps: { redis: Redis; emit: EventBus['emit'] }) => + (deps: { redis: Redis; logger: Logger; emit: EventBus['emit'] }) => async (params: { payload: AuthCodePayload resources?: { workspaceId?: string } }) => { - const { redis, emit } = deps + const { redis, logger, emit } = deps const { payload, resources } = params const potentialPayloadString = await redis.get(payload.code) @@ -63,6 +64,17 @@ export const validateStoredAuthCodeFactory = : null const formattedPayload = isPayload(potentialPayload) ? potentialPayload : null + logger.info( + { + payloadString: potentialPayloadString, + payload: { + ...formattedPayload, + code: null + } + }, + 'Validating execution engine request with provided auth payload.' + ) + if ( !formattedPayload || formattedPayload.code !== payload.code || diff --git a/packages/server/modules/automate/services/automationManagement.ts b/packages/server/modules/automate/services/automationManagement.ts index fe8a8c703..9b11e64ee 100644 --- a/packages/server/modules/automate/services/automationManagement.ts +++ b/packages/server/modules/automate/services/automationManagement.ts @@ -7,9 +7,9 @@ import { getServerOrigin } from '@/modules/shared/helpers/envHelper' import cryptoRandomString from 'crypto-random-string' import { createAutomation as clientCreateAutomation, - getFunction, - getFunctionRelease, - getFunctionReleases + getFunctionFactory, + getFunctionReleaseFactory, + getFunctionReleasesFactory } from '@/modules/automate/clients/executionEngine' import { Automate, Roles, removeNullOrUndefinedKeys } from '@speckle/shared' import { AuthCodePayloadAction } from '@/modules/automate/services/authCode' @@ -139,7 +139,7 @@ export const createAutomationFactory = export type CreateTestAutomationDeps = { getEncryptionKeyPair: GetEncryptionKeyPair - getFunction: typeof getFunction + getFunction: ReturnType storeAutomation: StoreAutomation storeAutomationRevision: StoreAutomationRevision validateStreamAccess: ValidateStreamAccess @@ -356,7 +356,7 @@ const validateNewTriggerDefinitions = } type ValidateNewRevisionFunctionsDeps = { - getFunctionRelease: typeof getFunctionRelease + getFunctionRelease: ReturnType } const validateNewRevisionFunctions = @@ -399,7 +399,7 @@ export type CreateAutomationRevisionDeps = { storeAutomationRevision: StoreAutomationRevision getEncryptionKeyPair: GetEncryptionKeyPair getFunctionInputDecryptor: FunctionInputDecryptor - getFunctionReleases: typeof getFunctionReleases + getFunctionReleases: ReturnType validateStreamAccess: ValidateStreamAccess eventEmit: EventBusEmit } & ValidateNewTriggerDefinitionsDeps & diff --git a/packages/server/modules/automate/services/functionManagement.ts b/packages/server/modules/automate/services/functionManagement.ts index e71ad02ac..907c4255a 100644 --- a/packages/server/modules/automate/services/functionManagement.ts +++ b/packages/server/modules/automate/services/functionManagement.ts @@ -2,7 +2,7 @@ import { CreateFunctionBody, ExecutionEngineFunctionTemplateId, createFunction, - getFunction, + getFunctionFactory, updateFunction as updateExecEngineFunction } from '@/modules/automate/clients/executionEngine' import { @@ -43,7 +43,7 @@ import { speckleAutomateUrl } from '@/modules/shared/helpers/envHelper' import { getFunctionsMarketplaceUrl } from '@/modules/core/helpers/routeHelper' -import { automateLogger } from '@/observability/logging' +import { automateLogger, Logger } from '@/observability/logging' import { CreateStoredAuthCode } from '@/modules/automate/domain/operations' import { GetUser } from '@/modules/core/domain/users/operations' import { noop } from 'lodash' @@ -195,17 +195,18 @@ export const createFunctionFromTemplateFactory = export type UpdateFunctionDeps = { updateFunction: typeof updateExecEngineFunction - getFunction: typeof getFunction + getFunction: ReturnType createStoredAuthCode: CreateStoredAuthCode + logger: Logger } export const updateFunctionFactory = (deps: UpdateFunctionDeps) => async (params: { input: UpdateAutomateFunctionInput; userId: string }) => { - const { updateFunction, createStoredAuthCode } = deps + const { updateFunction, createStoredAuthCode, logger } = deps const { input, userId } = params - const existingFn = await getFunction({ functionId: input.id }) + const existingFn = await getFunctionFactory({ logger })({ functionId: input.id }) if (!existingFn) { throw new AutomateFunctionUpdateError('Function not found') } diff --git a/packages/server/modules/core/graph/dataloaders/index.ts b/packages/server/modules/core/graph/dataloaders/index.ts index f6ed23fd1..d98183a9b 100644 --- a/packages/server/modules/core/graph/dataloaders/index.ts +++ b/packages/server/modules/core/graph/dataloaders/index.ts @@ -71,8 +71,8 @@ import { getRevisionsTriggerDefinitionsFactory } from '@/modules/automate/repositories/automations' import { - getFunction, - getFunctionReleases + getFunctionFactory, + getFunctionReleasesFactory } from '@/modules/automate/clients/executionEngine' import { FunctionReleaseSchemaType, @@ -96,6 +96,7 @@ import { CommitWithStreamBranchId, CommitWithStreamBranchMetadata } from '@/modules/core/domain/commits/types' +import { logger } from '@/observability/logging' declare module '@/modules/core/loaders' { interface ModularizedDataLoaders extends ReturnType {} @@ -618,7 +619,7 @@ const dataLoadersDefinition = defineRequestDataloaders( const results = await Promise.all( fnIds.map(async (fnId) => { try { - return await getFunction({ functionId: fnId }) + return await getFunctionFactory({ logger })({ functionId: fnId }) } catch (e) { const isNotFound = e instanceof ExecutionEngineFailedResponseError && @@ -642,7 +643,7 @@ const dataLoadersDefinition = defineRequestDataloaders( >( async (keys) => { const results = keyBy( - await getFunctionReleases({ + await getFunctionReleasesFactory({ logger })({ ids: keys.map(([fnId, fnReleaseId]) => ({ functionId: fnId, functionReleaseId: fnReleaseId diff --git a/packages/server/modules/workspaces/graph/resolvers/workspaces.ts b/packages/server/modules/workspaces/graph/resolvers/workspaces.ts index 3090fbf10..2204097d5 100644 --- a/packages/server/modules/workspaces/graph/resolvers/workspaces.ts +++ b/packages/server/modules/workspaces/graph/resolvers/workspaces.ts @@ -169,7 +169,7 @@ import { listWorkspaceSsoMembershipsFactory } from '@/modules/workspaces/repositories/sso' import { getDecryptor } from '@/modules/workspaces/helpers/sso' -import { getFunctions } from '@/modules/automate/clients/executionEngine' +import { getFunctionsFactory } from '@/modules/automate/clients/executionEngine' import { ExecutionEngineFailedResponseError, ExecutionEngineNetworkError @@ -1093,7 +1093,9 @@ export = FF_WORKSPACES_MODULE_ENABLED action: AuthCodePayloadAction.ListWorkspaceFunctions }) - const res = await getFunctions({ + const res = await getFunctionsFactory({ + logger: context.log + })({ auth: authCode, filters: { query: args.filter?.search ?? undefined,