fix(server/prometheus): ensure consistent Prometheus registry is used (#4106)
* fix(server/prometheus): ensure consistent Prometheus registry is used - there was a conflicting dependency between Metrics initialization and Modules initialization; resolved by separating registry initialization from metrics initialization - pass in the registry to prevent implicit dependency being broken - when registering a metric, first attempt to remove any of existing metrics with same name to prevent errors - to prevent sneaky uses of the implicit registry, replace default import with explicit import so it is clearer when prometheusClient.registry is used * Add tests for registering metrics
This commit is contained in:
+27
-12
@@ -10,7 +10,7 @@ import 'express-async-errors'
|
||||
import cookieParser from 'cookie-parser'
|
||||
|
||||
import { createTerminus } from '@godaddy/terminus'
|
||||
import Metrics from '@/observability'
|
||||
import Metrics, { initPrometheusRegistry } from '@/observability'
|
||||
import {
|
||||
startupLogger,
|
||||
shutdownLogger,
|
||||
@@ -23,8 +23,8 @@ import {
|
||||
sanitizeHeaders
|
||||
} from '@/observability/components/express/expressLogging'
|
||||
|
||||
import { errorMetricsMiddleware } from '@/observability/components/express/metrics/errorMetrics'
|
||||
import prometheusClient from 'prom-client'
|
||||
import { errorMetricsMiddlewareFactory } from '@/observability/components/express/metrics/errorMetrics'
|
||||
import prometheusClient, { Registry } from 'prom-client'
|
||||
|
||||
import { ApolloServer } from '@apollo/server'
|
||||
import { expressMiddleware } from '@apollo/server/express4'
|
||||
@@ -103,9 +103,11 @@ const isWsServer = (server: http.Server | MockWsServer): server is MockWsServer
|
||||
* is that graphql-ws uses an entirely different protocol, so the client-side has to change as well, and so old clients
|
||||
* will be unable to use any WebSocket/subscriptions functionality with the updated server
|
||||
*/
|
||||
export function buildApolloSubscriptionServer(
|
||||
export function buildApolloSubscriptionServer(params: {
|
||||
server: http.Server | MockWsServer
|
||||
): SubscriptionServer {
|
||||
registers?: Registry[]
|
||||
}): SubscriptionServer {
|
||||
const { server, registers } = params
|
||||
const httpServer = isWsServer(server) ? undefined : server
|
||||
const mockServer = isWsServer(server) ? server : undefined
|
||||
|
||||
@@ -118,7 +120,9 @@ export function buildApolloSubscriptionServer(
|
||||
metricConnectedClients,
|
||||
metricSubscriptionTotalOperations,
|
||||
metricSubscriptionTotalResponses
|
||||
} = initApolloSubscriptionMonitoring()
|
||||
} = initApolloSubscriptionMonitoring({
|
||||
registers: registers ?? [prometheusClient.register]
|
||||
})
|
||||
|
||||
const getHeaders = (params: {
|
||||
connContext?: PossiblyMockedConnectionContext
|
||||
@@ -256,7 +260,7 @@ export async function buildApolloServer(options?: {
|
||||
schema,
|
||||
plugins: [
|
||||
statusCodePlugin,
|
||||
loggingPluginFactory({ register: prometheusClient.register }),
|
||||
loggingPluginFactory({ registers: [prometheusClient.register] }),
|
||||
ApolloServerPluginLandingPageLocalDefault({
|
||||
embed: true,
|
||||
includeCookies: true
|
||||
@@ -305,6 +309,8 @@ export async function init() {
|
||||
startupLogger.info('🖼️ Serving for frontend-2...')
|
||||
|
||||
const app = express()
|
||||
const promRegister = initPrometheusRegistry() // has to be called before both Metrics and Modules are initialized
|
||||
|
||||
app.disable('x-powered-by')
|
||||
|
||||
// Moves things along automatically on restart.
|
||||
@@ -352,11 +358,14 @@ export async function init() {
|
||||
// Metrics relies on 'regions' table in the database, so much be initialized after migrations in the main database ("migrateDbToLatest({ region: 'main'," etc..)
|
||||
// It also relies on the regional knex clients, which will initialize and run migrations in the respective regions.
|
||||
// It must be initialized after the multiregion module is initialized in ModulesSetup.init
|
||||
await Metrics(app)
|
||||
await Metrics({ app, registry: promRegister })
|
||||
|
||||
// Init HTTP server & subscription server
|
||||
const server = http.createServer(app)
|
||||
const subscriptionServer = buildApolloSubscriptionServer(server)
|
||||
const subscriptionServer = buildApolloSubscriptionServer({
|
||||
server,
|
||||
registers: [promRegister]
|
||||
})
|
||||
|
||||
// Initialize graphql server
|
||||
const graphqlServer = await buildApolloServer({
|
||||
@@ -370,12 +379,13 @@ export async function init() {
|
||||
)
|
||||
|
||||
// At the very end adding default error handler middleware
|
||||
app.use(errorMetricsMiddleware)
|
||||
app.use(errorMetricsMiddlewareFactory({ promRegisters: [promRegister] }))
|
||||
app.use(defaultErrorHandler)
|
||||
|
||||
return {
|
||||
app,
|
||||
graphqlServer,
|
||||
registers: [promRegister],
|
||||
server,
|
||||
subscriptionServer,
|
||||
readinessCheck: healthchecks.isReady
|
||||
@@ -414,11 +424,13 @@ async function createFrontendProxy() {
|
||||
export async function startHttp(params: {
|
||||
server: http.Server
|
||||
app: Express
|
||||
registers?: Registry[]
|
||||
graphqlServer: ApolloServer<GraphQLContext>
|
||||
readinessCheck: ReadinessHandler
|
||||
customPortOverride?: number
|
||||
}) {
|
||||
const { server, app, graphqlServer, readinessCheck, customPortOverride } = params
|
||||
const { server, app, registers, graphqlServer, readinessCheck, customPortOverride } =
|
||||
params
|
||||
let bindAddress = getBindAddress() // defaults to 127.0.0.1
|
||||
let port = getPort() // defaults to 3000
|
||||
|
||||
@@ -436,7 +448,10 @@ export async function startHttp(params: {
|
||||
bindAddress = getBindAddress('0.0.0.0')
|
||||
}
|
||||
|
||||
monitorActiveConnections(server)
|
||||
monitorActiveConnections({
|
||||
httpServer: server,
|
||||
registers: registers ?? [prometheusClient.register]
|
||||
})
|
||||
|
||||
app.set('port', port)
|
||||
|
||||
|
||||
@@ -11,8 +11,8 @@ const { logger } = require('../observability/logging')
|
||||
const { init, startHttp } = require('../app')
|
||||
|
||||
init()
|
||||
.then(({ app, graphqlServer, server, readinessCheck }) =>
|
||||
startHttp({ app, graphqlServer, server, readinessCheck })
|
||||
.then(({ app, graphqlServer, registers, server, readinessCheck }) =>
|
||||
startHttp({ app, graphqlServer, registers, server, readinessCheck })
|
||||
)
|
||||
.catch((err) => {
|
||||
logger.error(err, 'Failed to start server. Exiting with non-zero exit code...')
|
||||
|
||||
@@ -5,8 +5,8 @@ const { logger } = require('../dist/observability/logging')
|
||||
const { init, startHttp } = require('../dist/app')
|
||||
|
||||
init()
|
||||
.then(({ app, graphqlServer, server, readinessCheck }) =>
|
||||
startHttp({ app, graphqlServer, server, readinessCheck })
|
||||
.then(({ app, graphqlServer, registers, server, readinessCheck }) =>
|
||||
startHttp({ app, graphqlServer, registers, server, readinessCheck })
|
||||
)
|
||||
.catch((err) => {
|
||||
logger.error(err, 'Failed to start server. Exiting with non-zero exit code...')
|
||||
|
||||
@@ -29,14 +29,14 @@ const isFieldNode = (node: SelectionNode): node is FieldNode => node.kind === 'F
|
||||
let metricCallCount: Counter<string>
|
||||
|
||||
export const loggingPluginFactory: (deps: {
|
||||
register: Registry
|
||||
registers: Registry[]
|
||||
}) => ApolloServerPlugin<GraphQLContext> = (deps) => ({
|
||||
serverWillStart: async () => {
|
||||
deps.register.removeSingleMetric('speckle_server_apollo_calls')
|
||||
deps.registers.forEach((r) => r.removeSingleMetric('speckle_server_apollo_calls'))
|
||||
metricCallCount = new Counter({
|
||||
name: 'speckle_server_apollo_calls',
|
||||
help: 'Number of calls',
|
||||
registers: [deps.register],
|
||||
registers: deps.registers,
|
||||
labelNames: ['actionName']
|
||||
})
|
||||
},
|
||||
|
||||
+25
-22
@@ -1,15 +1,14 @@
|
||||
import prometheusClient from 'prom-client'
|
||||
import { Counter, Gauge, Registry } from 'prom-client'
|
||||
|
||||
let apolloSubscriptionMonitoringIsInitialized = false
|
||||
|
||||
let metricConnectCounter: prometheusClient.Counter<string>
|
||||
let metricConnectedClients: prometheusClient.Gauge<string>
|
||||
let metricSubscriptionTotalOperations: prometheusClient.Counter<'subscriptionType'>
|
||||
let metricSubscriptionTotalResponses: prometheusClient.Counter<
|
||||
'subscriptionType' | 'status'
|
||||
>
|
||||
let metricConnectCounter: Counter<string>
|
||||
let metricConnectedClients: Gauge<string>
|
||||
let metricSubscriptionTotalOperations: Counter<'subscriptionType'>
|
||||
let metricSubscriptionTotalResponses: Counter<'subscriptionType' | 'status'>
|
||||
|
||||
export const initApolloSubscriptionMonitoring = () => {
|
||||
export const initApolloSubscriptionMonitoring = (params: { registers: Registry[] }) => {
|
||||
const { registers } = params
|
||||
if (apolloSubscriptionMonitoringIsInitialized)
|
||||
return {
|
||||
metricConnectCounter,
|
||||
@@ -19,33 +18,37 @@ export const initApolloSubscriptionMonitoring = () => {
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
prometheusClient.register.removeSingleMetric('speckle_server_apollo_connect')
|
||||
metricConnectCounter = new prometheusClient.Counter({
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_apollo_connect'))
|
||||
metricConnectCounter = new Counter({
|
||||
name: 'speckle_server_apollo_connect',
|
||||
help: 'Number of connects'
|
||||
help: 'Number of connects',
|
||||
registers
|
||||
})
|
||||
prometheusClient.register.removeSingleMetric('speckle_server_apollo_clients')
|
||||
metricConnectedClients = new prometheusClient.Gauge({
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_apollo_clients'))
|
||||
metricConnectedClients = new Gauge({
|
||||
name: 'speckle_server_apollo_clients',
|
||||
help: 'Number of currently connected clients'
|
||||
help: 'Number of currently connected clients',
|
||||
registers
|
||||
})
|
||||
|
||||
prometheusClient.register.removeSingleMetric(
|
||||
'speckle_server_apollo_graphql_total_subscription_operations'
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_apollo_graphql_total_subscription_operations')
|
||||
)
|
||||
metricSubscriptionTotalOperations = new prometheusClient.Counter({
|
||||
metricSubscriptionTotalOperations = new Counter({
|
||||
name: 'speckle_server_apollo_graphql_total_subscription_operations',
|
||||
help: 'Number of total subscription operations served by this instance',
|
||||
labelNames: ['subscriptionType'] as const
|
||||
labelNames: ['subscriptionType'] as const,
|
||||
registers
|
||||
})
|
||||
|
||||
prometheusClient.register.removeSingleMetric(
|
||||
'speckle_server_apollo_graphql_total_subscription_responses'
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_apollo_graphql_total_subscription_responses')
|
||||
)
|
||||
metricSubscriptionTotalResponses = new prometheusClient.Counter({
|
||||
metricSubscriptionTotalResponses = new Counter({
|
||||
name: 'speckle_server_apollo_graphql_total_subscription_responses',
|
||||
help: 'Number of total subscription responses served by this instance',
|
||||
labelNames: ['subscriptionType', 'status'] as const
|
||||
labelNames: ['subscriptionType', 'status'] as const,
|
||||
registers
|
||||
})
|
||||
|
||||
apolloSubscriptionMonitoringIsInitialized = true
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
/* istanbul ignore file */
|
||||
import type { Nullable } from '@speckle/shared'
|
||||
import prometheusClient from 'prom-client'
|
||||
import type express from 'express'
|
||||
import { Counter, type Registry } from 'prom-client'
|
||||
import type { ErrorRequestHandler } from 'express'
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let metricErrorCount: Nullable<prometheusClient.Counter<any>> = null
|
||||
let metricErrorCount: Nullable<Counter<'route'>> = null
|
||||
|
||||
export const errorMetricsMiddleware: express.ErrorRequestHandler = (
|
||||
err,
|
||||
req,
|
||||
res,
|
||||
next
|
||||
) => {
|
||||
export const errorMetricsMiddlewareFactory: (params: {
|
||||
promRegisters: Registry[]
|
||||
}) => ErrorRequestHandler = (params) => (err, req, res, next) => {
|
||||
if (metricErrorCount === null) {
|
||||
metricErrorCount = new prometheusClient.Counter({
|
||||
params.promRegisters.forEach((register) => {
|
||||
register.removeSingleMetric('speckle_server_request_errors')
|
||||
})
|
||||
metricErrorCount = new Counter({
|
||||
name: 'speckle_server_request_errors',
|
||||
help: 'Number of requests that threw exceptions',
|
||||
labelNames: ['route']
|
||||
labelNames: ['route'],
|
||||
registers: params.promRegisters
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -41,15 +41,17 @@ type MetricConfig = {
|
||||
}
|
||||
|
||||
export const heapSizeAndUsed = (
|
||||
registry: Registry,
|
||||
registers: Registry[],
|
||||
config: MetricConfig = {}
|
||||
): Metric => {
|
||||
const registers = registry ? [registry] : undefined
|
||||
const namePrefix = config.prefix ?? ''
|
||||
const labels = config.labels ?? {}
|
||||
const labelNames = Object.keys(labels)
|
||||
const buckets = { ...DEFAULT_NODEJS_HEAP_SIZE_BUCKETS, ...config.buckets }
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + NODEJS_HEAP_SIZE_TOTAL)
|
||||
})
|
||||
const heapSizeTotal = new Histogram({
|
||||
name: namePrefix + NODEJS_HEAP_SIZE_TOTAL,
|
||||
help: 'Process heap size from Node.js in bytes. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
@@ -57,6 +59,10 @@ export const heapSizeAndUsed = (
|
||||
buckets: buckets.NODEJS_HEAP_SIZE_TOTAL,
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + NODEJS_HEAP_SIZE_USED)
|
||||
})
|
||||
const heapSizeUsed = new Histogram({
|
||||
name: namePrefix + NODEJS_HEAP_SIZE_USED,
|
||||
help: 'Process heap size used from Node.js in bytes. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
@@ -64,6 +70,10 @@ export const heapSizeAndUsed = (
|
||||
buckets: buckets.NODEJS_HEAP_SIZE_USED,
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + NODEJS_EXTERNAL_MEMORY)
|
||||
})
|
||||
const externalMemUsed = new Histogram({
|
||||
name: namePrefix + NODEJS_EXTERNAL_MEMORY,
|
||||
help: 'Node.js external memory size in bytes. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
|
||||
+8
-6
@@ -24,23 +24,25 @@ type HighFrequencyMonitor = {
|
||||
}
|
||||
|
||||
export const initHighFrequencyMonitoring = (params: {
|
||||
register: Registry
|
||||
registers: Registry[]
|
||||
collectionPeriodMilliseconds: number
|
||||
config: MetricConfig
|
||||
}): HighFrequencyMonitor => {
|
||||
const { register, collectionPeriodMilliseconds } = params
|
||||
const { registers, collectionPeriodMilliseconds } = params
|
||||
const config = params.config
|
||||
const registers = register ? [register] : undefined
|
||||
const namePrefix = config.prefix ?? ''
|
||||
const labels = config.labels ?? {}
|
||||
const labelNames = Object.keys(labels)
|
||||
|
||||
const metrics = [
|
||||
processCpuTotal(register, config),
|
||||
heapSizeAndUsed(register, config),
|
||||
knexConnections(register, config)
|
||||
processCpuTotal(registers, config),
|
||||
heapSizeAndUsed(registers, config),
|
||||
knexConnections(registers, config)
|
||||
]
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + 'self_monitor_time_high_frequency')
|
||||
})
|
||||
const selfMonitor = new Histogram({
|
||||
name: namePrefix + 'self_monitor_time_high_frequency',
|
||||
help: 'The time taken to collect all of the high frequency metrics, seconds.',
|
||||
|
||||
+22
-2
@@ -36,13 +36,18 @@ type MetricConfig = {
|
||||
>
|
||||
}
|
||||
|
||||
export const knexConnections = (registry: Registry, config: MetricConfig): Metric => {
|
||||
const registers = registry ? [registry] : undefined
|
||||
export const knexConnections = (
|
||||
registers: Registry[],
|
||||
config: MetricConfig
|
||||
): Metric => {
|
||||
const namePrefix = config.prefix ?? ''
|
||||
const labels = config.labels ?? {}
|
||||
const labelNames = [...Object.keys(labels), 'region']
|
||||
const buckets = { ...DEFAULT_KNEX_TOTAL_BUCKETS, ...config.buckets }
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_CONNECTIONS_FREE)
|
||||
})
|
||||
const knexConnectionsFree = new Histogram({
|
||||
name: namePrefix + KNEX_CONNECTIONS_FREE,
|
||||
help: 'Number of free DB connections. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
@@ -51,6 +56,9 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_CONNECTIONS_USED)
|
||||
})
|
||||
const knexConnectionsUsed = new Histogram({
|
||||
name: namePrefix + KNEX_CONNECTIONS_USED,
|
||||
help: 'Number of used DB connections',
|
||||
@@ -59,6 +67,9 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_PENDING_ACQUIRES)
|
||||
})
|
||||
const knexPendingAcquires = new Histogram({
|
||||
name: namePrefix + KNEX_PENDING_ACQUIRES,
|
||||
help: 'Number of pending DB connection aquires',
|
||||
@@ -67,6 +78,9 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_PENDING_CREATES)
|
||||
})
|
||||
const knexPendingCreates = new Histogram({
|
||||
name: namePrefix + KNEX_PENDING_CREATES,
|
||||
help: 'Number of pending DB connection creates',
|
||||
@@ -75,6 +89,9 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_PENDING_VALIDATIONS)
|
||||
})
|
||||
const knexPendingValidations = new Histogram({
|
||||
name: namePrefix + KNEX_PENDING_VALIDATIONS,
|
||||
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
|
||||
@@ -83,6 +100,9 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
|
||||
labelNames
|
||||
})
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + KNEX_REMAINING_CAPACITY)
|
||||
})
|
||||
const knexRemainingCapacity = new Histogram({
|
||||
name: namePrefix + KNEX_REMAINING_CAPACITY,
|
||||
help: 'Remaining capacity of the DB connection pool',
|
||||
|
||||
@@ -41,15 +41,17 @@ type MetricConfig = {
|
||||
}
|
||||
|
||||
export const processCpuTotal = (
|
||||
registry: Registry,
|
||||
registers: Registry[],
|
||||
config: MetricConfig = {}
|
||||
): Metric => {
|
||||
const registers = registry ? [registry] : undefined
|
||||
const namePrefix = config.prefix ?? ''
|
||||
const labels = config.labels ?? {}
|
||||
const labelNames = Object.keys(labels)
|
||||
const buckets = { ...DEFAULT_CPU_TOTAL_BUCKETS, ...config.buckets }
|
||||
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + PROCESS_CPU_USER_SECONDS)
|
||||
})
|
||||
const cpuUserUsageHistogram = new Histogram({
|
||||
name: namePrefix + PROCESS_CPU_USER_SECONDS,
|
||||
help: 'Total user CPU time spent in seconds. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
@@ -57,6 +59,9 @@ export const processCpuTotal = (
|
||||
buckets: buckets.PROCESS_CPU_USER_SECONDS,
|
||||
registers
|
||||
})
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + PROCESS_CPU_SYSTEM_SECONDS)
|
||||
})
|
||||
const cpuSystemUsageHistogram = new Histogram({
|
||||
name: namePrefix + PROCESS_CPU_SYSTEM_SECONDS,
|
||||
help: 'Total system CPU time spent in seconds. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
@@ -64,6 +69,9 @@ export const processCpuTotal = (
|
||||
buckets: buckets.PROCESS_CPU_SYSTEM_SECONDS,
|
||||
labelNames
|
||||
})
|
||||
registers.forEach((r) => {
|
||||
r.removeSingleMetric(namePrefix + PROCESS_CPU_SECONDS)
|
||||
})
|
||||
const cpuUsageHistogram = new Histogram({
|
||||
name: namePrefix + PROCESS_CPU_SECONDS,
|
||||
help: 'Total user and system CPU time spent in seconds. This data is collected at a higher frequency than Prometheus scrapes, and is presented as a Histogram.',
|
||||
|
||||
@@ -1,17 +1,21 @@
|
||||
/* istanbul ignore file */
|
||||
import type { Nullable } from '@speckle/shared'
|
||||
import prometheusClient from 'prom-client'
|
||||
import type http from 'http'
|
||||
import { Registry, Gauge } from 'prom-client'
|
||||
import type { Server } from 'http'
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let metricActiveConnections: Nullable<prometheusClient.Gauge<any>> = null
|
||||
let metricActiveConnections: Nullable<Gauge<any>> = null
|
||||
|
||||
export const monitorActiveConnections = (httpServer: http.Server) => {
|
||||
export const monitorActiveConnections = (params: {
|
||||
httpServer: Server
|
||||
registers: Registry[]
|
||||
}) => {
|
||||
const { httpServer, registers } = params
|
||||
if (metricActiveConnections !== null) {
|
||||
prometheusClient.register.removeSingleMetric('speckle_server_active_connections')
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_active_connections'))
|
||||
}
|
||||
|
||||
metricActiveConnections = new prometheusClient.Gauge({
|
||||
metricActiveConnections = new Gauge({
|
||||
name: 'speckle_server_active_connections',
|
||||
help: 'Number of active http connections',
|
||||
async collect() {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import prometheusClient, { type Registry } from 'prom-client'
|
||||
import { type Registry, Summary, Counter, Gauge, Histogram } from 'prom-client'
|
||||
import { numberOfFreeConnections } from '@/modules/shared/helpers/dbHelper'
|
||||
import { type Knex } from 'knex'
|
||||
import { Logger } from 'pino'
|
||||
@@ -7,12 +7,12 @@ import { omit } from 'lodash'
|
||||
import { getRequestContext } from '@/observability/components/express/requestContext'
|
||||
import { collectLongTrace } from '@speckle/shared'
|
||||
|
||||
let metricQueryDuration: prometheusClient.Summary<string>
|
||||
let metricQueryErrors: prometheusClient.Counter<string>
|
||||
let metricConnectionAcquisitionDuration: prometheusClient.Histogram<string>
|
||||
let metricConnectionPoolErrors: prometheusClient.Counter<string>
|
||||
let metricConnectionInUseDuration: prometheusClient.Histogram<string>
|
||||
let metricConnectionPoolReapingDuration: prometheusClient.Histogram<string>
|
||||
let metricQueryDuration: Summary<string>
|
||||
let metricQueryErrors: Counter<string>
|
||||
let metricConnectionAcquisitionDuration: Histogram<string>
|
||||
let metricConnectionPoolErrors: Counter<string>
|
||||
let metricConnectionInUseDuration: Histogram<string>
|
||||
let metricConnectionPoolReapingDuration: Histogram<string>
|
||||
const initializedRegions: string[] = []
|
||||
let initializedPollingMetrics = false
|
||||
|
||||
@@ -20,13 +20,16 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
getAllDbClients: () => Promise<
|
||||
Array<{ client: Knex; isMain: boolean; regionKey: string }>
|
||||
>
|
||||
register: Registry
|
||||
registers: Registry[]
|
||||
logger: Logger
|
||||
}) => {
|
||||
const { registers } = params
|
||||
if (!initializedPollingMetrics) {
|
||||
initializedPollingMetrics = true
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_knex_free'))
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_free',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of free DB connections',
|
||||
@@ -40,8 +43,9 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_knex_used'))
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_used',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of used DB connections',
|
||||
@@ -55,8 +59,9 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_knex_pending'))
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_pending',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of pending DB connection aquires',
|
||||
@@ -70,8 +75,11 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_pending_creates')
|
||||
)
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_pending_creates',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of pending DB connection creates',
|
||||
@@ -85,8 +93,11 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_pending_validations')
|
||||
)
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_pending_validations',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
|
||||
@@ -100,8 +111,11 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
new prometheusClient.Gauge({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_remaining_capacity')
|
||||
)
|
||||
new Gauge({
|
||||
registers,
|
||||
name: 'speckle_server_knex_remaining_capacity',
|
||||
labelNames: ['region'],
|
||||
help: 'Remaining capacity of the DB connection pool',
|
||||
@@ -115,43 +129,57 @@ export const initKnexPrometheusMetrics = async (params: {
|
||||
}
|
||||
})
|
||||
|
||||
metricQueryDuration = new prometheusClient.Summary({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_knex_query_duration'))
|
||||
metricQueryDuration = new Summary({
|
||||
registers,
|
||||
labelNames: ['sqlMethod', 'sqlNumberBindings', 'region'],
|
||||
name: 'speckle_server_knex_query_duration',
|
||||
help: 'Summary of the DB query durations in seconds'
|
||||
})
|
||||
|
||||
metricQueryErrors = new prometheusClient.Counter({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) => r.removeSingleMetric('speckle_server_knex_query_errors'))
|
||||
metricQueryErrors = new Counter({
|
||||
registers,
|
||||
labelNames: ['sqlMethod', 'sqlNumberBindings', 'region'],
|
||||
name: 'speckle_server_knex_query_errors',
|
||||
help: 'Number of DB queries with errors'
|
||||
})
|
||||
|
||||
metricConnectionAcquisitionDuration = new prometheusClient.Histogram({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_connection_acquisition_duration')
|
||||
)
|
||||
metricConnectionAcquisitionDuration = new Histogram({
|
||||
registers,
|
||||
name: 'speckle_server_knex_connection_acquisition_duration',
|
||||
labelNames: ['region'],
|
||||
help: 'Summary of the DB connection acquisition duration, from request to acquire connection from pool until successfully acquired, in seconds'
|
||||
})
|
||||
|
||||
metricConnectionPoolErrors = new prometheusClient.Counter({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_connection_acquisition_errors')
|
||||
)
|
||||
metricConnectionPoolErrors = new Counter({
|
||||
registers,
|
||||
name: 'speckle_server_knex_connection_acquisition_errors',
|
||||
labelNames: ['region'],
|
||||
help: 'Number of DB connection pool acquisition errors'
|
||||
})
|
||||
|
||||
metricConnectionInUseDuration = new prometheusClient.Histogram({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_connection_usage_duration')
|
||||
)
|
||||
metricConnectionInUseDuration = new Histogram({
|
||||
registers,
|
||||
name: 'speckle_server_knex_connection_usage_duration',
|
||||
labelNames: ['region'],
|
||||
help: 'Summary of the DB connection duration, from successful acquisition of connection from pool until release back to pool, in seconds'
|
||||
})
|
||||
|
||||
metricConnectionPoolReapingDuration = new prometheusClient.Histogram({
|
||||
registers: [params.register],
|
||||
registers.forEach((r) =>
|
||||
r.removeSingleMetric('speckle_server_knex_connection_pool_reaping_duration')
|
||||
)
|
||||
metricConnectionPoolReapingDuration = new Histogram({
|
||||
registers,
|
||||
name: 'speckle_server_knex_connection_pool_reaping_duration',
|
||||
labelNames: ['region'],
|
||||
help: 'Summary of the DB connection pool reaping duration, in seconds. Reaping is the process of removing idle connections from the pool.'
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/* istanbul ignore file */
|
||||
import prometheusClient from 'prom-client'
|
||||
import prometheusClient, { Registry } from 'prom-client'
|
||||
import promBundle from 'express-prom-bundle'
|
||||
|
||||
import { initKnexPrometheusMetrics } from '@/observability/components/knex/knexMonitoring'
|
||||
@@ -10,18 +10,34 @@ import type express from 'express'
|
||||
import { getAllRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
|
||||
|
||||
let prometheusInitialized = false
|
||||
let prometheusRegistryInitialized = false
|
||||
|
||||
export default async function (app: express.Express) {
|
||||
if (!prometheusInitialized) {
|
||||
prometheusInitialized = true
|
||||
/**
|
||||
* This has to be called prior to using Prometheus
|
||||
* @returns The registry of Prometheus metrics which will be served
|
||||
*/
|
||||
export function initPrometheusRegistry() {
|
||||
if (!prometheusRegistryInitialized) {
|
||||
prometheusRegistryInitialized = true
|
||||
prometheusClient.register.clear()
|
||||
prometheusClient.register.setDefaultLabels({
|
||||
project: 'speckle-server',
|
||||
app: 'server'
|
||||
})
|
||||
prometheusClient.collectDefaultMetrics()
|
||||
}
|
||||
|
||||
return prometheusClient.register
|
||||
}
|
||||
|
||||
export default async function (params: { app: express.Express; registry: Registry }) {
|
||||
const { app, registry } = params
|
||||
if (!prometheusInitialized) {
|
||||
prometheusInitialized = true
|
||||
prometheusClient.collectDefaultMetrics({
|
||||
register: registry
|
||||
})
|
||||
const highfrequencyMonitoring = initHighFrequencyMonitoring({
|
||||
register: prometheusClient.register,
|
||||
registers: [registry],
|
||||
collectionPeriodMilliseconds: highFrequencyMetricsCollectionPeriodMs(),
|
||||
config: {
|
||||
getDbClients: getAllRegisteredDbClients
|
||||
@@ -30,7 +46,7 @@ export default async function (app: express.Express) {
|
||||
highfrequencyMonitoring.start()
|
||||
|
||||
await initKnexPrometheusMetrics({
|
||||
register: prometheusClient.register,
|
||||
registers: [registry],
|
||||
getAllDbClients: getAllRegisteredDbClients,
|
||||
logger
|
||||
})
|
||||
@@ -41,7 +57,8 @@ export default async function (app: express.Express) {
|
||||
includePath: true,
|
||||
httpDurationMetricName: 'speckle_server_request_duration',
|
||||
metricType: 'summary',
|
||||
autoregister: false
|
||||
autoregister: false,
|
||||
promRegistry: registry
|
||||
})
|
||||
)
|
||||
}
|
||||
@@ -49,8 +66,8 @@ export default async function (app: express.Express) {
|
||||
// Expose prometheus metrics
|
||||
app.get('/metrics', async (req, res, next) => {
|
||||
try {
|
||||
res.set('Content-Type', prometheusClient.register.contentType)
|
||||
res.end(await prometheusClient.register.metrics())
|
||||
res.set('Content-Type', registry.contentType)
|
||||
res.end(await registry.metrics())
|
||||
} catch (ex: unknown) {
|
||||
res.status(500).end(ex instanceof Error ? ex.message : `${ex}`)
|
||||
next(ex)
|
||||
|
||||
@@ -35,6 +35,7 @@ export const testLogger = extendLoggerComponent(logger, 'test')
|
||||
export const fileUploadsLogger = extendLoggerComponent(logger, 'file-uploads')
|
||||
export const emailLogger = extendLoggerComponent(logger, 'email')
|
||||
export const taskSchedulerLogger = extendLoggerComponent(logger, 'task-scheduler')
|
||||
export const previewLogger = extendLoggerComponent(logger, 'preview')
|
||||
|
||||
export type Logger = typeof logger
|
||||
export { extendLoggerComponent, Observability }
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
import { beforeEachContext, initializeTestServer } from '@/test/hooks'
|
||||
import { expect } from 'chai'
|
||||
|
||||
describe('Observability', () => {
|
||||
describe('Metrics', () => {
|
||||
let serverAddress: string
|
||||
before(async () => {
|
||||
const ctx = await beforeEachContext()
|
||||
;({ serverAddress } = await initializeTestServer(ctx))
|
||||
})
|
||||
|
||||
describe('Register metrics', () => {
|
||||
let metricsPageBody = ''
|
||||
before(async () => {
|
||||
const metricsResponse = await fetch(`${serverAddress}/metrics`, {
|
||||
method: 'GET'
|
||||
})
|
||||
metricsPageBody = await metricsResponse.text()
|
||||
})
|
||||
const testCases = [
|
||||
'speckle_server_apollo_calls',
|
||||
'speckle_server_request_duration',
|
||||
'speckle_server_request_errors',
|
||||
'speckle_server_active_connections',
|
||||
'speckle_server_apollo_connect',
|
||||
'speckle_server_apollo_clients',
|
||||
'speckle_server_apollo_graphql_total_subscription_operations',
|
||||
'speckle_server_apollo_graphql_total_subscription_responses',
|
||||
'speckle_server_active_connections',
|
||||
'speckle_server_knex_free',
|
||||
'speckle_server_knex_used',
|
||||
'speckle_server_knex_pending',
|
||||
'speckle_server_knex_pending_creates',
|
||||
'speckle_server_knex_pending_validations',
|
||||
'speckle_server_knex_remaining_capacity',
|
||||
'speckle_server_knex_query_duration',
|
||||
'speckle_server_knex_query_errors',
|
||||
'speckle_server_knex_connection_acquisition_duration',
|
||||
'speckle_server_knex_connection_acquisition_errors',
|
||||
'speckle_server_knex_connection_usage_duration',
|
||||
'speckle_server_knex_connection_pool_reaping_duration',
|
||||
'nodejs_heap_size_total_bytes_high_frequency',
|
||||
'nodejs_heap_size_used_bytes_high_frequency',
|
||||
'nodejs_external_memory_bytes_high_frequency',
|
||||
'self_monitor_time_high_frequency',
|
||||
'knex_connections_free_high_frequency',
|
||||
'knex_connections_used_high_frequency',
|
||||
'knex_pending_acquires_high_frequency',
|
||||
'knex_pending_creates_high_frequency',
|
||||
'knex_pending_validations_high_frequency',
|
||||
'knex_remaining_capacity_high_frequency',
|
||||
'process_cpu_user_seconds_total_high_frequency',
|
||||
'process_cpu_system_seconds_total_high_frequency',
|
||||
'process_cpu_seconds_total_high_frequency'
|
||||
]
|
||||
|
||||
testCases.forEach((testCase) =>
|
||||
it(`should register metric ${testCase}`, async () => {
|
||||
const re = new RegExp(String.raw`(^${testCase}.*)\}\s([\d]+)$`, 'gm')
|
||||
const match = [...metricsPageBody.matchAll(re)]
|
||||
if (!match) {
|
||||
expect(match).not.to.be.null
|
||||
return '' //HACK force correct type below
|
||||
}
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -290,7 +290,7 @@ export const testApolloSubscriptionServer = async () => {
|
||||
set(mockWsServer, 'removeListener', mockWsServer.off.bind(mockWsServer)) // backwards compat w/ subscriptions-transport-ws
|
||||
|
||||
const mockWs = MockSocket.WebSocket as unknown as ws.WebSocket
|
||||
const apolloSubServer = buildApolloSubscriptionServer(mockWsServer)
|
||||
const apolloSubServer = buildApolloSubscriptionServer({ server: mockWsServer })
|
||||
|
||||
// weakRef to ensure we dont prevent garbage collection
|
||||
const clients: WeakRef<SubscriptionClient>[] = []
|
||||
|
||||
Reference in New Issue
Block a user