feat: removed sub/replication/worker related observability (#5384)

This commit is contained in:
Daniel Gak Anagrov
2025-09-04 16:07:05 +01:00
committed by GitHub
parent 8629bf9e95
commit ea1377aaa1
13 changed files with 0 additions and 548 deletions
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db', 'max_logical_replication_workers'], '_'),
help: 'Configured value of max_logical_replication_workers for the Postgres database',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ max_logical_replication_workers: string }]
}>(`SHOW max_logical_replication_workers;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No max_logical_replication_workers found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].max_logical_replication_workers)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_max_replication_slots'], '_'),
help: 'Configured value of max_replication_slots for the Postgres database',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ max_replication_slots: string }]
}>(`SHOW max_replication_slots;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No max_replication_slots found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].max_replication_slots)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db', 'max_sync_workers_per_subscription'], '_'),
help: 'Configured value of max_sync_workers_per_subscription for the Postgres database',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ max_sync_workers_per_subscription: string }]
}>(`SHOW max_sync_workers_per_subscription;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No max_sync_workers_per_subscription found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].max_sync_workers_per_subscription)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_max_wal_senders'], '_'),
help: 'Configured value of max_wal_senders for the Postgres database',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ max_wal_senders: string }]
}>(`SHOW max_wal_senders;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No max_wal_senders found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].max_wal_senders)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db', 'max_worker_processes'], '_'),
help: 'Configured value of max_worker_processes for the Postgres database',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ max_worker_processes: string }]
}>(`SHOW max_worker_processes;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No max_worker_processes found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].max_worker_processes)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const metric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_wal_level_is_logical'], '_'),
help: "Indicates whether the value of wal_level for the Postgres database is 'logical'",
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ wal_level: string }]
}>(`SHOW wal_level;`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No wal_level found for region '{region}'. This is odd."
)
return
}
metric.set(
{ ...labels, region: regionKey },
queryResults.rows[0].wal_level === 'logical' ? 1 : 0
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const dbWorkers = new prometheusClient.Gauge({
name: join([namePrefix, 'db_workers'], '_'),
help: 'Number of database workers',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const connectionResults = await client.raw<{
rows: [{ worker_count: string }]
}>(`SELECT COUNT(*) AS worker_count FROM pg_stat_activity;`)
if (!connectionResults.rows.length) {
logger.error(
{ region: regionKey },
"No database workers found for region '{region}'. This is odd."
)
return
}
dbWorkers.set(
{ ...labels, region: regionKey },
parseInt(connectionResults.rows[0].worker_count)
)
})
)
}
}
@@ -1,33 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const promMetric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_workers_awaiting_locks'], '_'),
help: 'Number of database workers awaiting locks',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ count: string }]
}>(`SELECT COUNT(*) FROM pg_stat_activity WHERE wait_event = 'Lock';`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No database workers found for region '{region}'. This is odd."
)
return
}
promMetric.set(
{ ...labels, region: regionKey },
parseInt(queryResults.rows[0].count)
)
})
)
}
}
@@ -1,35 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
export const init: MetricInitializer = (config) => {
const { labelNames, namePrefix, logger } = config
const connections = new prometheusClient.Gauge({
name: join([namePrefix, 'db_inactive_replication_slots'], '_'),
help: 'Number of inactive database replication slots',
labelNames: ['region', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const connectionResults = await client.raw<{
rows: [{ inactive_replication_slots: string }]
}>(
`SELECT count(*) AS inactive_replication_slots FROM pg_replication_slots WHERE slot_type = 'logical' AND (slot_name LIKE 'projectsub_%' OR slot_name LIKE 'userssub_%') AND NOT active;`
)
if (!connectionResults.rows.length) {
logger.error(
{ region: regionKey },
"No data related to replication slots found for region '{region}'. This is odd."
)
return
}
connections.set(
{ ...labels, region: regionKey },
parseInt(connectionResults.rows[0].inactive_replication_slots)
)
})
)
}
}
@@ -1,47 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
import * as Environment from '@speckle/shared/environment'
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
export const init: MetricInitializer = (config) => {
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
return async () => {
// Do nothing
}
}
const { labelNames, namePrefix, logger } = config
const promMetric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_replication_slot_lag'], '_'),
help: 'Lag of replication slots in bytes',
labelNames: ['region', 'slotname', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
const queryResults = await client.raw<{
rows: [{ slot_name: string; slot_lag_bytes: string }]
}>(`
SELECT slot_name, pg_current_wal_lsn() - confirmed_flush_lsn AS slot_lag_bytes
FROM pg_replication_slots WHERE slot_type='logical';
`)
if (!queryResults.rows.length) {
logger.error(
{ region: regionKey },
"No database replication slots found for region '{region}'. This is odd."
)
return
}
for (const row of queryResults.rows) {
promMetric.set(
{ ...labels, region: regionKey, slotname: row.slot_name },
parseInt(row.slot_lag_bytes)
)
}
})
)
}
}
@@ -1,113 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
import * as Environment from '@speckle/shared/environment'
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
type QueryResponseSchema = {
rows: [
{
write_lag: string
flush_lag: string
replay_lag: string
subscription_name: string
}
]
}
export const init: MetricInitializer = (config) => {
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
return async () => {
// Do nothing
}
}
const { labelNames, namePrefix, logger } = config
const promMetric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_replication_worker_lag'], '_'),
help: 'Lag of replication workers, by type of lag',
labelNames: ['region', 'lagtype', 'subscriptionname', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
let queryResults: QueryResponseSchema | undefined = undefined
try {
// sent_lsn is the data that has been received by the worker
// write_lsn is the data that has been written to the WAL, but it may be cached and not yet flushed to disk
// flush_lsn is the data that has been flushed to disk
// replay_lsn is the data that is visible to the user (may differ from flush_lsn if there is a replication conflict)
queryResults = await client.raw<QueryResponseSchema>(`
SELECT sent_lsn - write_lsn AS write_lag,
write_lsn - flush_lsn AS flush_lag,
flush_lsn - replay_lsn AS replay_lag,
application_name AS subscription_name
FROM aiven_extras.pg_stat_replication_list();
`)
} catch (err) {
if (
err instanceof Error &&
err.message.includes('schema "aiven_extras" does not exist')
) {
logger.warn(
{ err, region: regionKey },
"'aiven_extras' extension is not yet enabled for region '{region}'."
)
return // continue to next region
}
//else rethrow
throw err
}
if (!queryResults?.rows.length) {
logger.error(
{ region: regionKey },
"No database workers found for region '{region}'. This is odd."
)
return
}
for (const row of queryResults.rows) {
const writeLag = parseInt(row.write_lag)
if (!isNaN(writeLag)) {
promMetric.set(
{
...labels,
region: regionKey,
lagtype: 'write',
subscriptionname: row.subscription_name
},
parseInt(row.write_lag)
)
}
const flushLag = parseInt(row.flush_lag)
if (!isNaN(flushLag)) {
promMetric.set(
{
...labels,
region: regionKey,
lagtype: 'flush',
subscriptionname: row.subscription_name
},
parseInt(row.flush_lag)
)
}
const replayLag = parseInt(row.replay_lag)
if (!isNaN(replayLag)) {
promMetric.set(
{
...labels,
region: regionKey,
lagtype: 'replay',
subscriptionname: row.subscription_name
},
parseInt(row.replay_lag)
)
}
}
})
)
}
}
@@ -1,65 +0,0 @@
import prometheusClient from 'prom-client'
import { join } from 'lodash-es'
import type { MetricInitializer } from '@/observability/types.js'
import * as Environment from '@speckle/shared/environment'
type QueryResponseSchema = {
rows: [{ subname: string; subenabled: boolean }]
}
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
export const init: MetricInitializer = (config) => {
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
return async () => {
// Do nothing
}
}
const { labelNames, namePrefix, logger } = config
const promMetric = new prometheusClient.Gauge({
name: join([namePrefix, 'db_subscriptions_enabled'], '_'),
help: 'Enabled subscriptions to other databases',
labelNames: ['region', 'subscriptionname', ...labelNames]
})
return async (params) => {
const { dbClients, labels } = params
await Promise.all(
dbClients.map(async ({ client, regionKey }) => {
let queryResults: QueryResponseSchema | undefined = undefined
try {
queryResults = await client.raw<QueryResponseSchema>(`
SELECT subname, subenabled FROM aiven_extras.pg_list_all_subscriptions();
`)
} catch (err) {
if (
err instanceof Error &&
err.message.includes('schema "aiven_extras" does not exist')
) {
logger.warn(
{ err, region: regionKey },
"'aiven_extras' extension is not yet enabled for region '{region}'."
)
return // continue to next region
}
//else rethrow
throw err
}
if (!queryResults?.rows.length) {
logger.error(
{ region: regionKey },
"No database replication slots found for region '{region}'. This is odd."
)
return
}
for (const row of queryResults.rows) {
promMetric.set(
{ ...labels, region: regionKey, subscriptionname: row.subname },
row.subenabled ? 1 : 0
)
}
})
)
}
}
@@ -5,27 +5,15 @@ import { join } from 'lodash-es'
import { Counter, Histogram, Registry } from 'prom-client'
import prometheusClient from 'prom-client'
import { init as commits } from '@/observability/metrics/commits.js'
import { init as dbMaxLogicalReplicationWorkers } from '@/observability/metrics/dbMaxLogicalReplicationWorkers.js'
import { init as dbMaxPerparedTransactions } from '@/observability/metrics/dbMaxPerparedTransactions.js'
import { init as dbMaxReplicationSlots } from '@/observability/metrics/dbMaxReplicationSlots.js'
import { init as dbMaxSyncWorkersPerSubscription } from '@/observability/metrics/dbMaxSyncWorkersPerSubscription.js'
import { init as dbMaxWalSenders } from '@/observability/metrics/dbMaxWalSenders.js'
import { init as dbMaxWorkerProcesses } from '@/observability/metrics/dbMaxWorkerProcesses.js'
import { init as dbPreparedTransactions } from '@/observability/metrics/dbPreparedTransactions.js'
import { init as dbSize } from '@/observability/metrics/dbSize.js'
import { init as dbWalLevel } from '@/observability/metrics/dbWalLevel.js'
import { init as dbWorkers } from '@/observability/metrics/dbWorkers.js'
import { init as dbWorkersAwaitingLocks } from '@/observability/metrics/dbWorkersAwaitingLocks.js'
import { init as fileImports } from '@/observability/metrics/fileImports.js'
import { init as fileSize } from '@/observability/metrics/fileSize.js'
import { init as inactiveReplicationSlots } from '@/observability/metrics/inactiveReplicationSlots.js'
import { init as maxConnections } from '@/observability/metrics/maxConnections.js'
import { init as objects } from '@/observability/metrics/objects.js'
import { init as previews } from '@/observability/metrics/previews.js'
import { init as replicationSlotLag } from '@/observability/metrics/replicationSlotLag.js'
import { init as replicationWorkerLag } from '@/observability/metrics/replicationWorkerLag.js'
import { init as streams } from '@/observability/metrics/streams.js'
import { init as subscriptionsEnabled } from '@/observability/metrics/subscriptionsEnabled.js'
import { init as tablesize } from '@/observability/metrics/tableSize.js'
import { init as users } from '@/observability/metrics/users.js'
import { init as webhooks } from '@/observability/metrics/webhooks.js'
@@ -63,27 +51,15 @@ function initMonitoringMetrics(params: {
const metricsToInitialize = [
commits,
dbMaxLogicalReplicationWorkers,
dbMaxPerparedTransactions,
dbMaxReplicationSlots,
dbMaxSyncWorkersPerSubscription,
dbMaxWalSenders,
dbMaxWorkerProcesses,
dbPreparedTransactions,
dbWalLevel,
dbSize,
dbWorkers,
dbWorkersAwaitingLocks,
fileImports,
fileSize,
inactiveReplicationSlots,
maxConnections,
objects,
previews,
replicationSlotLag,
replicationWorkerLag,
streams,
subscriptionsEnabled,
tablesize,
users,
webhooks