fix(server/automate): logger should have request context and request ID sent to execution engine (#4092)

* fix(server/automate): logger should have request context

* WIP - pass in request Id

* WIP

* fix(automate): better logging for automate processes

* chore(automate): slight log improvement

* fix(automate): just in case

---------

Co-authored-by: Charles Driesler <chuck@speckle.systems>
This commit is contained in:
Iain Sproat
2025-02-28 18:22:00 +00:00
committed by GitHub
parent 6d153bd485
commit 04f84c31f3
8 changed files with 255 additions and 195 deletions
@@ -28,6 +28,8 @@ import {
retry,
timeoutAt
} from '@speckle/shared'
import { randomUUID } from 'crypto'
import { Logger } from 'pino'
import { has, isObjectLike, isEmpty } from 'lodash'
export type AuthCodePayloadWithOrigin = AuthCodePayload & { origin: string }
@@ -75,22 +77,23 @@ const getApiUrl = (
return url.toString()
}
const invokeSafeJsonRequest = async <
Response extends Record<string, unknown> = Record<string, unknown>
>(
...args: Parameters<typeof invokeRequest>
): Promise<Response | null> => {
const [{ url, method }] = args
try {
return await invokeJsonRequest<Response>(...args)
} catch (e) {
automateLogger.error(
{ url, method, err: e },
'Automate API request error suppressed.'
)
return null
const invokeSafeJsonRequestFactory =
<Response extends Record<string, unknown> = Record<string, unknown>>(deps: {
logger: Logger
}) =>
async (...args: Parameters<typeof invokeRequest>): Promise<Response | null> => {
const { logger } = deps
const [{ url, method }] = args
try {
return await invokeJsonRequest<Response>({
...args[0],
requestId: logger.bindings?.()?.req?.id
})
} catch (e) {
logger.error({ url, method, err: e }, 'Automate API request error suppressed.')
return null
}
}
}
const invokeJsonRequest = async <R = Record<string, unknown>>(
...args: Parameters<typeof invokeRequest>
@@ -110,10 +113,11 @@ const invokeRequest = async (params: {
url: string
method?: RequestInit['method']
body?: Record<string, unknown>
requestId?: string
token?: string
retry?: boolean
}) => {
const { url, method = 'get', body, token } = params
const { url, method = 'get', body, token, requestId } = params
const response = await retry(
async () =>
@@ -122,6 +126,7 @@ const invokeRequest = async (params: {
method,
headers: {
'Content-Type': 'application/json',
'X-Request-Id': requestId ?? randomUUID(),
...(token?.length ? { Authorization: `Bearer ${token}` } : {})
},
body: body && isObjectLike(body) ? JSON.stringify(body) : undefined
@@ -388,78 +393,91 @@ export type GetFunctionResponse = FunctionWithVersionsSchemaType & {
versionCursor: Nullable<string>
}
export const getFunction = async (params: {
functionId: string
token?: string
releases?: { cursor?: string; limit?: number; versionsFilter?: string }
}) => {
const { functionId, token } = params
const query = Object.values(params.releases || {}).filter(isNonNullable).length
? params.releases
: undefined
export const getFunctionFactory =
(deps: { logger: Logger }) =>
async (params: {
functionId: string
token?: string
releases?: { cursor?: string; limit?: number; versionsFilter?: string }
}) => {
const { logger } = deps
const { functionId, token } = params
const query = Object.values(params.releases || {}).filter(isNonNullable).length
? params.releases
: undefined
const url = getApiUrl(`/api/v1/functions/${functionId}`, {
query
})
const url = getApiUrl(`/api/v1/functions/${functionId}`, {
query
})
return await invokeSafeJsonRequest<GetFunctionResponse>({
url,
method: 'get',
token
})
}
return await invokeSafeJsonRequestFactory<GetFunctionResponse>({
logger
})({
url,
method: 'get',
token
})
}
export type GetFunctionReleaseResponse = FunctionReleaseSchemaType
/**
* TODO: Build optimized exec engine endpoint for this
*/
export const getFunctionReleases = async (params: {
ids: Array<{ functionId: string; functionReleaseId: string }>
}) => {
const { ids } = params
const results = await Promise.all(
ids.map(async ({ functionId, functionReleaseId }) => {
try {
return await getFunctionRelease({ functionId, functionReleaseId })
} catch (e) {
if (e instanceof ExecutionEngineNetworkError) {
return null
}
if (
e instanceof ExecutionEngineFailedResponseError &&
e.response.statusMessage === 'FunctionNotFound'
) {
return null
}
export const getFunctionReleasesFactory =
(deps: { logger: Logger }) =>
async (params: { ids: Array<{ functionId: string; functionReleaseId: string }> }) => {
const { logger } = deps
const { ids } = params
const results = await Promise.all(
ids.map(async ({ functionId, functionReleaseId }) => {
try {
return await getFunctionReleaseFactory({ logger })({
functionId,
functionReleaseId
})
} catch (e) {
if (e instanceof ExecutionEngineNetworkError) {
return null
}
if (
e instanceof ExecutionEngineFailedResponseError &&
e.response.statusMessage === 'FunctionNotFound'
) {
return null
}
throw e
}
throw e
}
})
)
return results.filter(isNonNullable)
}
export const getFunctionReleaseFactory =
(deps: { logger: Logger }) =>
async (params: { functionId: string; functionReleaseId: string }) => {
const { logger } = deps
const { functionId, functionReleaseId } = params
const url = getApiUrl(
`/api/v1/functions/${functionId}/versions/${functionReleaseId}`
)
const result = await invokeSafeJsonRequestFactory<GetFunctionReleaseResponse>({
logger
})({
url,
method: 'get'
})
)
return results.filter(isNonNullable)
}
export const getFunctionRelease = async (params: {
functionId: string
functionReleaseId: string
}) => {
const { functionId, functionReleaseId } = params
const url = getApiUrl(`/api/v1/functions/${functionId}/versions/${functionReleaseId}`)
const result = await invokeSafeJsonRequest<GetFunctionReleaseResponse>({
url,
method: 'get'
})
return result
? {
...result,
functionId
}
: null
}
return result
? {
...result,
functionId
}
: null
}
export type GetFunctionsParams = {
auth?: AuthCodePayload
@@ -480,29 +498,32 @@ export type GetFunctionsResponse = {
totalCount: number
}
export const getFunctions = async (params: GetFunctionsParams) => {
const url = getApiUrl(`/api/v2/functions`, {
query: {
requireRelease: true,
...params.filters
}
})
export const getFunctionsFactory =
(deps: { logger: Logger }) => async (params: GetFunctionsParams) => {
const { logger } = deps
const authToken = params.auth
? Buffer.from(
JSON.stringify({
...params.auth,
origin: getServerOrigin()
})
).toString('base64')
: undefined
const url = getApiUrl(`/api/v2/functions`, {
query: {
requireRelease: true,
...params.filters
}
})
return await invokeSafeJsonRequest<GetFunctionsResponse>({
url,
method: 'get',
token: authToken
})
}
const authToken = params.auth
? Buffer.from(
JSON.stringify({
...params.auth,
origin: getServerOrigin()
})
).toString('base64')
: undefined
return await invokeSafeJsonRequestFactory<GetFunctionsResponse>({ logger })({
url,
method: 'get',
token: authToken
})
}
export type GetPublicFunctionsResponse = {
totalCount: number
@@ -510,79 +531,94 @@ export type GetPublicFunctionsResponse = {
items: FunctionWithVersionsSchemaType[]
}
export const getPublicFunctions = async (params: {
query?: {
query?: string
cursor?: string
limit?: number
functionsWithoutVersions?: boolean
}
}) => {
const { query } = params
const url = getApiUrl(`/api/v1/functions`, {
query: {
...query,
featuredFunctionsOnly: true
export const getPublicFunctionsFactory =
(deps: { logger: Logger }) =>
async (params: {
query?: {
query?: string
cursor?: string
limit?: number
functionsWithoutVersions?: boolean
}
})
}) => {
const { logger } = deps
const { query } = params
const url = getApiUrl(`/api/v1/functions`, {
query: {
...query,
featuredFunctionsOnly: true
}
})
return await invokeSafeJsonRequest<GetPublicFunctionsResponse>({
url,
method: 'get'
})
}
return await invokeSafeJsonRequestFactory<GetFunctionsResponse>({
logger
})({
url,
method: 'get'
})
}
type GetUserFunctionsResponse = {
functions: FunctionWithVersionsSchemaType[]
}
export const getUserFunctions = async (params: {
userId: string
query?: {
query?: string
cursor?: string
limit?: number
}
body: {
speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin
}
}) => {
const { userId, query, body } = params
const url = getApiUrl(`/api/v2/users/${userId}/functions`, { query })
export const getUserFunctionsFactory =
(deps: { logger: Logger }) =>
async (params: {
userId: string
query?: {
query?: string
cursor?: string
limit?: number
}
body: {
speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin
}
}) => {
const { logger } = deps
const { userId, query, body } = params
const url = getApiUrl(`/api/v2/users/${userId}/functions`, { query })
return await invokeSafeJsonRequest<GetUserFunctionsResponse>({
url,
method: 'POST',
body,
retry: false
})
}
return await invokeSafeJsonRequestFactory<GetUserFunctionsResponse>({
logger
})({
url,
method: 'POST',
body,
retry: false
})
}
type GetWorkspaceFunctionsResponse = {
functions: FunctionWithVersionsSchemaType[]
}
export const getWorkspaceFunctions = async (params: {
workspaceId: string
query?: {
query?: string
cursor?: string
limit?: number
}
body: {
speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin
}
}) => {
const { workspaceId, query, body } = params
const url = getApiUrl(`/api/v2/workspaces/${workspaceId}/functions`, { query })
export const getWorkspaceFunctionsFactory =
(deps: { logger: Logger }) =>
async (params: {
workspaceId: string
query?: {
query?: string
cursor?: string
limit?: number
}
body: {
speckleServerAuthenticationPayload: AuthCodePayloadWithOrigin
}
}) => {
const { logger } = deps
const { workspaceId, query, body } = params
const url = getApiUrl(`/api/v2/workspaces/${workspaceId}/functions`, { query })
return await invokeSafeJsonRequest<GetWorkspaceFunctionsResponse>({
url,
method: 'POST',
body,
retry: false
})
}
return await invokeSafeJsonRequestFactory<GetWorkspaceFunctionsResponse>({
logger
})({
url,
method: 'POST',
body,
retry: false
})
}
type UserGithubAuthStateResponse = {
userHasAuthorizedGitHubApp: boolean
@@ -3,13 +3,13 @@ import {
createFunctionWithoutVersion,
triggerAutomationRun,
updateFunction as execEngineUpdateFunction,
getFunction,
getFunctionRelease,
getPublicFunctions,
getFunctionReleases,
getFunctionFactory,
getFunctionReleaseFactory,
getPublicFunctionsFactory,
getFunctionReleasesFactory,
getUserGithubAuthState,
getUserGithubOrganizations,
getUserFunctions
getUserFunctionsFactory
} from '@/modules/automate/clients/executionEngine'
import {
GetProjectAutomationsParams,
@@ -459,10 +459,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED
}
},
AutomateFunction: {
async releases(parent, args) {
async releases(parent, args, context) {
try {
// TODO: Replace w/ dataloader batch call, when/if possible
const fn = await getFunction({
const fn = await getFunctionFactory({
logger: context.log
})({
functionId: parent.id,
releases:
args?.cursor || args?.filter?.search || args?.limit
@@ -567,10 +569,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED
async updateFunction(_parent, args, ctx) {
const update = updateFunctionFactory({
updateFunction: execEngineUpdateFunction,
getFunction,
getFunction: getFunctionFactory({ logger: ctx.log }),
createStoredAuthCode: createStoredAuthCodeFactory({
redis: getGenericRedis()
})
}),
logger: ctx.log
})
return await update({ input: args.input, userId: ctx.userId! })
}
@@ -621,12 +624,12 @@ export = (FF_AUTOMATE_MODULE_ENABLED
getAutomation: getAutomationFactory({ db: projectDb }),
storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }),
getBranchesByIds: getBranchesByIdsFactory({ db: projectDb }),
getFunctionRelease,
getFunctionRelease: getFunctionReleaseFactory({ logger: ctx.log }),
getEncryptionKeyPair,
getFunctionInputDecryptor: getFunctionInputDecryptorFactory({
buildDecryptor
}),
getFunctionReleases,
getFunctionReleases: getFunctionReleasesFactory({ logger: ctx.log }),
eventEmit: getEventBus().emit,
validateStreamAccess
})
@@ -679,7 +682,7 @@ export = (FF_AUTOMATE_MODULE_ENABLED
const create = createTestAutomationFactory({
getEncryptionKeyPair,
getFunction,
getFunction: getFunctionFactory({ logger: ctx.log }),
storeAutomation: storeAutomationFactory({ db: projectDb }),
storeAutomationRevision: storeAutomationRevisionFactory({ db: projectDb }),
validateStreamAccess,
@@ -730,10 +733,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED
}
},
Query: {
async automateValidateAuthCode(_parent, args) {
async automateValidateAuthCode(_parent, args, ctx) {
const validate = validateStoredAuthCodeFactory({
redis: getGenericRedis(),
emit: getEventBus().emit
emit: getEventBus().emit,
logger: ctx.log
})
const payload = removeNullOrUndefinedKeys(args.payload)
const resources = removeNullOrUndefinedKeys(args.resources ?? {})
@@ -755,9 +759,11 @@ export = (FF_AUTOMATE_MODULE_ENABLED
return convertFunctionToGraphQLReturn(fn)
},
async automateFunctions(_parent, args) {
async automateFunctions(_parent, args, ctx) {
try {
const res = await getPublicFunctions({
const res = await getPublicFunctionsFactory({
logger: ctx.log
})({
query: {
query: args.filter?.search || undefined,
cursor: args.cursor || undefined,
@@ -809,7 +815,9 @@ export = (FF_AUTOMATE_MODULE_ENABLED
action: AuthCodePayloadAction.ListUserFunctions
})
const res = await getUserFunctions({
const res = await getUserFunctionsFactory({
logger: context.log
})({
userId: context.userId!,
query: {
query: args.filter?.search || undefined,
+2 -2
View File
@@ -17,7 +17,7 @@ import {
import { isNonNullable, Scopes } from '@speckle/shared'
import { registerOrUpdateScopeFactory } from '@/modules/shared/repositories/scopes'
import {
getFunction,
getFunctionFactory,
triggerAutomationRun
} from '@/modules/automate/clients/executionEngine'
import logStreamRest from '@/modules/automate/rest/logStream'
@@ -289,7 +289,7 @@ const initializeEventListeners = () => {
const fn = isTestEnv()
? null
: await getFunction({ functionId: functionRun.functionId })
: await getFunctionFactory({ logger })({ functionId: functionRun.functionId })
const userEmail = await getUserEmailFromAutomationRunFactory({
getFullAutomationRevisionMetadata: getFullAutomationRevisionMetadataFactory({
@@ -5,6 +5,7 @@ import { EventBus } from '@/modules/shared/services/eventBus'
import cryptoRandomString from 'crypto-random-string'
import Redis from 'ioredis'
import { get, has, isObjectLike } from 'lodash'
import { Logger } from 'pino'
export enum AuthCodePayloadAction {
CreateAutomation = 'createAutomation',
@@ -47,14 +48,14 @@ export const createStoredAuthCodeFactory =
}
export const validateStoredAuthCodeFactory =
(deps: { redis: Redis; emit: EventBus['emit'] }) =>
(deps: { redis: Redis; logger: Logger; emit: EventBus['emit'] }) =>
async (params: {
payload: AuthCodePayload
resources?: {
workspaceId?: string
}
}) => {
const { redis, emit } = deps
const { redis, logger, emit } = deps
const { payload, resources } = params
const potentialPayloadString = await redis.get(payload.code)
@@ -63,6 +64,17 @@ export const validateStoredAuthCodeFactory =
: null
const formattedPayload = isPayload(potentialPayload) ? potentialPayload : null
logger.info(
{
payloadString: potentialPayloadString,
payload: {
...formattedPayload,
code: null
}
},
'Validating execution engine request with provided auth payload.'
)
if (
!formattedPayload ||
formattedPayload.code !== payload.code ||
@@ -7,9 +7,9 @@ import { getServerOrigin } from '@/modules/shared/helpers/envHelper'
import cryptoRandomString from 'crypto-random-string'
import {
createAutomation as clientCreateAutomation,
getFunction,
getFunctionRelease,
getFunctionReleases
getFunctionFactory,
getFunctionReleaseFactory,
getFunctionReleasesFactory
} from '@/modules/automate/clients/executionEngine'
import { Automate, Roles, removeNullOrUndefinedKeys } from '@speckle/shared'
import { AuthCodePayloadAction } from '@/modules/automate/services/authCode'
@@ -139,7 +139,7 @@ export const createAutomationFactory =
export type CreateTestAutomationDeps = {
getEncryptionKeyPair: GetEncryptionKeyPair
getFunction: typeof getFunction
getFunction: ReturnType<typeof getFunctionFactory>
storeAutomation: StoreAutomation
storeAutomationRevision: StoreAutomationRevision
validateStreamAccess: ValidateStreamAccess
@@ -356,7 +356,7 @@ const validateNewTriggerDefinitions =
}
type ValidateNewRevisionFunctionsDeps = {
getFunctionRelease: typeof getFunctionRelease
getFunctionRelease: ReturnType<typeof getFunctionReleaseFactory>
}
const validateNewRevisionFunctions =
@@ -399,7 +399,7 @@ export type CreateAutomationRevisionDeps = {
storeAutomationRevision: StoreAutomationRevision
getEncryptionKeyPair: GetEncryptionKeyPair
getFunctionInputDecryptor: FunctionInputDecryptor
getFunctionReleases: typeof getFunctionReleases
getFunctionReleases: ReturnType<typeof getFunctionReleasesFactory>
validateStreamAccess: ValidateStreamAccess
eventEmit: EventBusEmit
} & ValidateNewTriggerDefinitionsDeps &
@@ -2,7 +2,7 @@ import {
CreateFunctionBody,
ExecutionEngineFunctionTemplateId,
createFunction,
getFunction,
getFunctionFactory,
updateFunction as updateExecEngineFunction
} from '@/modules/automate/clients/executionEngine'
import {
@@ -43,7 +43,7 @@ import {
speckleAutomateUrl
} from '@/modules/shared/helpers/envHelper'
import { getFunctionsMarketplaceUrl } from '@/modules/core/helpers/routeHelper'
import { automateLogger } from '@/observability/logging'
import { automateLogger, Logger } from '@/observability/logging'
import { CreateStoredAuthCode } from '@/modules/automate/domain/operations'
import { GetUser } from '@/modules/core/domain/users/operations'
import { noop } from 'lodash'
@@ -195,17 +195,18 @@ export const createFunctionFromTemplateFactory =
export type UpdateFunctionDeps = {
updateFunction: typeof updateExecEngineFunction
getFunction: typeof getFunction
getFunction: ReturnType<typeof getFunctionFactory>
createStoredAuthCode: CreateStoredAuthCode
logger: Logger
}
export const updateFunctionFactory =
(deps: UpdateFunctionDeps) =>
async (params: { input: UpdateAutomateFunctionInput; userId: string }) => {
const { updateFunction, createStoredAuthCode } = deps
const { updateFunction, createStoredAuthCode, logger } = deps
const { input, userId } = params
const existingFn = await getFunction({ functionId: input.id })
const existingFn = await getFunctionFactory({ logger })({ functionId: input.id })
if (!existingFn) {
throw new AutomateFunctionUpdateError('Function not found')
}
@@ -71,8 +71,8 @@ import {
getRevisionsTriggerDefinitionsFactory
} from '@/modules/automate/repositories/automations'
import {
getFunction,
getFunctionReleases
getFunctionFactory,
getFunctionReleasesFactory
} from '@/modules/automate/clients/executionEngine'
import {
FunctionReleaseSchemaType,
@@ -96,6 +96,7 @@ import {
CommitWithStreamBranchId,
CommitWithStreamBranchMetadata
} from '@/modules/core/domain/commits/types'
import { logger } from '@/observability/logging'
declare module '@/modules/core/loaders' {
interface ModularizedDataLoaders extends ReturnType<typeof dataLoadersDefinition> {}
@@ -618,7 +619,7 @@ const dataLoadersDefinition = defineRequestDataloaders(
const results = await Promise.all(
fnIds.map(async (fnId) => {
try {
return await getFunction({ functionId: fnId })
return await getFunctionFactory({ logger })({ functionId: fnId })
} catch (e) {
const isNotFound =
e instanceof ExecutionEngineFailedResponseError &&
@@ -642,7 +643,7 @@ const dataLoadersDefinition = defineRequestDataloaders(
>(
async (keys) => {
const results = keyBy(
await getFunctionReleases({
await getFunctionReleasesFactory({ logger })({
ids: keys.map(([fnId, fnReleaseId]) => ({
functionId: fnId,
functionReleaseId: fnReleaseId
@@ -169,7 +169,7 @@ import {
listWorkspaceSsoMembershipsFactory
} from '@/modules/workspaces/repositories/sso'
import { getDecryptor } from '@/modules/workspaces/helpers/sso'
import { getFunctions } from '@/modules/automate/clients/executionEngine'
import { getFunctionsFactory } from '@/modules/automate/clients/executionEngine'
import {
ExecutionEngineFailedResponseError,
ExecutionEngineNetworkError
@@ -1093,7 +1093,9 @@ export = FF_WORKSPACES_MODULE_ENABLED
action: AuthCodePayloadAction.ListWorkspaceFunctions
})
const res = await getFunctions({
const res = await getFunctionsFactory({
logger: context.log
})({
auth: authCode,
filters: {
query: args.filter?.search ?? undefined,