Revert "feat(multi-region): metrics for knex for all regional databases (#3508)" (#3534)

This reverts commit f51eb91861.
This commit is contained in:
Iain Sproat
2024-11-21 11:14:04 +00:00
committed by GitHub
parent f51eb91861
commit 2a7c51f3df
7 changed files with 254 additions and 348 deletions
+2 -2
View File
@@ -67,7 +67,7 @@ import { GraphQLError } from 'graphql'
import { redactSensitiveVariables } from '@/logging/loggingHelper'
import { buildMocksConfig } from '@/modules/mocks'
import { defaultErrorHandler } from '@/modules/core/rest/defaultErrorHandler'
import { migrateDbToLatestFactory } from '@/db/migrations'
import { migrateDbToLatest } from '@/db/migrations'
import { statusCodePlugin } from '@/modules/core/graph/plugins/statusCode'
import { BaseError, ForbiddenError } from '@/modules/shared/errors'
import { loggingPlugin } from '@/modules/core/graph/plugins/logging'
@@ -364,7 +364,7 @@ export async function init() {
// Moves things along automatically on restart.
// Should perhaps be done manually?
await migrateDbToLatestFactory({ region: 'main', db: knex })()
await migrateDbToLatest(knex)()
app.use(cookieParser())
app.use(DetermineRequestIdMiddleware)
+3 -13
View File
@@ -1,15 +1,5 @@
import { Knex } from 'knex'
import { logger } from '@/logging/logging'
export const migrateDbToLatestFactory =
(params: { db: Knex; region: string }) => async () => {
const { db, region } = params
try {
await db.migrate.latest()
} catch (err: unknown) {
logger.error(
{ err, region },
'Error migrating db to latest for region "{region}".'
)
}
}
export const migrateDbToLatest = (db: Knex) => async () => {
await db.migrate.latest()
}
@@ -14,7 +14,7 @@ type MetricConfig = {
prefix?: string
labels?: Record<string, string>
buckets?: Record<string, number[]>
getDbClients: () => Promise<Record<string, Knex>>
knex: Knex
}
type HighFrequencyMonitor = {
@@ -31,7 +31,7 @@ type MetricConfig = {
prefix?: string
labels?: Record<string, string>
buckets?: Record<BucketName, number[]>
getDbClients: () => Promise<Record<string, Knex>>
knex: Knex
}
export const knexConnections = (registry: Registry, config: MetricConfig): Metric => {
@@ -40,6 +40,7 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
const labels = config.labels ?? {}
const labelNames = Object.keys(labels)
const buckets = { ...DEFAULT_KNEX_TOTAL_BUCKETS, ...config.buckets }
const knex = config.knex
const knexConnectionsFree = new Histogram({
name: namePrefix + KNEX_CONNECTIONS_FREE,
@@ -91,20 +92,14 @@ export const knexConnections = (registry: Registry, config: MetricConfig): Metri
return {
collect: () => {
for (const [region, knex] of Object.entries(config.getDbClients())) {
const labelsAndRegion = { ...labels, region }
const connPool = knex.client.pool
const connPool = knex.client.pool
knexConnectionsFree.observe(labelsAndRegion, connPool.numFree())
knexConnectionsUsed.observe(labelsAndRegion, connPool.numUsed())
knexPendingAcquires.observe(labelsAndRegion, connPool.numPendingAcquires())
knexPendingCreates.observe(labelsAndRegion, connPool.numPendingCreates())
knexPendingValidations.observe(
labelsAndRegion,
connPool.numPendingValidations()
)
knexRemainingCapacity.observe(labelsAndRegion, numberOfFreeConnections(knex))
}
knexConnectionsFree.observe(labels, connPool.numFree())
knexConnectionsUsed.observe(labels, connPool.numUsed())
knexPendingAcquires.observe(labels, connPool.numPendingAcquires())
knexPendingCreates.observe(labels, connPool.numPendingCreates())
knexPendingValidations.observe(labels, connPool.numPendingValidations())
knexRemainingCapacity.observe(labels, numberOfFreeConnections(knex))
}
}
}
+3 -3
View File
@@ -4,10 +4,10 @@ import promBundle from 'express-prom-bundle'
import { initKnexPrometheusMetrics } from '@/logging/knexMonitoring'
import { initHighFrequencyMonitoring } from '@/logging/highFrequencyMetrics/highfrequencyMonitoring'
import knex from '@/db/knex'
import { highFrequencyMetricsCollectionPeriodMs } from '@/modules/shared/helpers/envHelper'
import { startupLogger as logger } from '@/logging/logging'
import type express from 'express'
import { getAllClients } from '@/modules/multiregion/dbSelector'
let prometheusInitialized = false
@@ -24,14 +24,14 @@ export default function (app: express.Express) {
register: prometheusClient.register,
collectionPeriodMilliseconds: highFrequencyMetricsCollectionPeriodMs(),
config: {
getDbClients: getAllClients
knex
}
})
highfrequencyMonitoring.start()
initKnexPrometheusMetrics({
register: prometheusClient.register,
getAllDbClients: getAllClients,
db: knex,
logger
})
const expressMetricsMiddleware = promBundle({
+234 -293
View File
@@ -4,146 +4,11 @@ import { type Knex } from 'knex'
import { Logger } from 'pino'
import { toNDecimalPlaces } from '@/modules/core/utils/formatting'
import { omit } from 'lodash'
import { Entries } from 'type-fest'
let metricQueryDuration: prometheusClient.Summary<string>
let metricQueryErrors: prometheusClient.Counter<string>
let metricConnectionAcquisitionDuration: prometheusClient.Histogram<string>
let metricConnectionPoolErrors: prometheusClient.Counter<string>
let metricConnectionInUseDuration: prometheusClient.Histogram<string>
let metricConnectionPoolReapingDuration: prometheusClient.Histogram<string>
let alreadyInitialized = false
export const initKnexPrometheusMetrics = (params: {
getAllDbClients: () => Promise<Record<string, Knex>>
db: Knex
register: Registry
logger: Logger
}) => {
if (alreadyInitialized) return
alreadyInitialized = true
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_free',
labelNames: ['region'],
help: 'Number of free DB connections',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, db.client.pool.numFree())
}
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_used',
labelNames: ['region'],
help: 'Number of used DB connections',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, db.client.pool.numUsed())
}
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending',
labelNames: ['region'],
help: 'Number of pending DB connection aquires',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, db.client.pool.numPendingAcquires())
}
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending_creates',
labelNames: ['region'],
help: 'Number of pending DB connection creates',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, db.client.pool.numPendingCreates())
}
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending_validations',
labelNames: ['region'],
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, db.client.pool.numPendingValidations())
}
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_remaining_capacity',
labelNames: ['region'],
help: 'Remaining capacity of the DB connection pool',
collect() {
for (const [region, db] of Object.entries(params.getAllDbClients())) {
this.set({ region }, numberOfFreeConnections(db))
}
}
})
metricQueryDuration = new prometheusClient.Summary({
registers: [params.register],
labelNames: ['sqlMethod', 'sqlNumberBindings', 'region'],
name: 'speckle_server_knex_query_duration',
help: 'Summary of the DB query durations in seconds'
})
metricQueryErrors = new prometheusClient.Counter({
registers: [params.register],
labelNames: ['sqlMethod', 'sqlNumberBindings', 'region'],
name: 'speckle_server_knex_query_errors',
help: 'Number of DB queries with errors'
})
metricConnectionAcquisitionDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_acquisition_duration',
labelNames: ['region'],
help: 'Summary of the DB connection acquisition duration, from request to acquire connection from pool until successfully acquired, in seconds'
})
metricConnectionPoolErrors = new prometheusClient.Counter({
registers: [params.register],
name: 'speckle_server_knex_connection_acquisition_errors',
labelNames: ['region'],
help: 'Number of DB connection pool acquisition errors'
})
metricConnectionInUseDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_usage_duration',
labelNames: ['region'],
help: 'Summary of the DB connection duration, from successful acquisition of connection from pool until release back to pool, in seconds'
})
metricConnectionPoolReapingDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_pool_reaping_duration',
labelNames: ['region'],
help: 'Summary of the DB connection pool reaping duration, in seconds. Reaping is the process of removing idle connections from the pool.'
})
updateKnexPrometheusMetrics({
...params
})
}
export const updateKnexPrometheusMetrics = async (params: {
getAllDbClients: () => Promise<Record<string, Knex>>
logger: Logger
}) => {
const normalizeSqlMethod = (sqlMethod: string) => {
if (!sqlMethod) return 'unknown'
@@ -155,178 +20,254 @@ export const updateKnexPrometheusMetrics = async (params: {
}
}
interface QueryEvent extends Knex.Sql {
__knexUid: string
__knexTxId: string
__knexQueryUid: string
}
const queryStartTime: Record<string, number> = {}
const connectionAcquisitionStartTime: Record<string, number> = {}
const connectionInUseStartTime: Record<string, number> = {}
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_free',
help: 'Number of free DB connections',
collect() {
this.set(params.db.client.pool.numFree())
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_used',
help: 'Number of used DB connections',
collect() {
this.set(params.db.client.pool.numUsed())
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending',
help: 'Number of pending DB connection aquires',
collect() {
this.set(params.db.client.pool.numPendingAcquires())
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending_creates',
help: 'Number of pending DB connection creates',
collect() {
this.set(params.db.client.pool.numPendingCreates())
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_pending_validations',
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
collect() {
this.set(params.db.client.pool.numPendingValidations())
}
})
new prometheusClient.Gauge({
registers: [params.register],
name: 'speckle_server_knex_remaining_capacity',
help: 'Remaining capacity of the DB connection pool',
collect() {
this.set(numberOfFreeConnections(params.db))
}
})
const metricQueryDuration = new prometheusClient.Summary({
registers: [params.register],
labelNames: ['sqlMethod', 'sqlNumberBindings'],
name: 'speckle_server_knex_query_duration',
help: 'Summary of the DB query durations in seconds'
})
const metricQueryErrors = new prometheusClient.Counter({
registers: [params.register],
labelNames: ['sqlMethod', 'sqlNumberBindings'],
name: 'speckle_server_knex_query_errors',
help: 'Number of DB queries with errors'
})
const metricConnectionAcquisitionDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_acquisition_duration',
help: 'Summary of the DB connection acquisition duration, from request to acquire connection from pool until successfully acquired, in seconds'
})
const metricConnectionPoolErrors = new prometheusClient.Counter({
registers: [params.register],
name: 'speckle_server_knex_connection_acquisition_errors',
help: 'Number of DB connection pool acquisition errors'
})
const metricConnectionInUseDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_usage_duration',
help: 'Summary of the DB connection duration, from successful acquisition of connection from pool until release back to pool, in seconds'
})
const metricConnectionPoolReapingDuration = new prometheusClient.Histogram({
registers: [params.register],
name: 'speckle_server_knex_connection_pool_reaping_duration',
help: 'Summary of the DB connection pool reaping duration, in seconds. Reaping is the process of removing idle connections from the pool.'
})
// configure hooks on knex
for (const [region, db] of Object.entries(
params.getAllDbClients()
) as Entries<Knex>) {
const queryStartTime: Record<string, number> = {}
const connectionAcquisitionStartTime: Record<string, number> = {}
const connectionInUseStartTime: Record<string, number> = {}
db.on('query', (data: QueryEvent) => {
const queryId = data.__knexQueryUid + ''
queryStartTime[queryId] = performance.now()
})
params.db.on('query', (data) => {
const queryId = data.__knexQueryUid + ''
queryStartTime[queryId] = performance.now()
})
db.on('query-response', (_response: unknown, data: QueryEvent) => {
const queryId = data.__knexQueryUid + ''
const durationMs = performance.now() - queryStartTime[queryId]
const durationSec = toNDecimalPlaces(durationMs / 1000, 2)
delete queryStartTime[queryId]
if (!isNaN(durationSec))
metricQueryDuration
.labels({
region,
sqlMethod: normalizeSqlMethod(data.method),
sqlNumberBindings: data.bindings?.length || -1
})
.observe(durationSec)
params.logger.debug(
{
region,
sql: data.sql,
sqlMethod: normalizeSqlMethod(data.method),
sqlQueryId: queryId,
sqlQueryDurationMs: toNDecimalPlaces(durationMs, 0),
sqlNumberBindings: data.bindings?.length || -1
},
"DB query successfully completed, for method '{sqlMethod}', after {sqlQueryDurationMs}ms"
)
})
params.db.on('query-response', (_data, querySpec) => {
const queryId = querySpec.__knexQueryUid + ''
const durationMs = performance.now() - queryStartTime[queryId]
const durationSec = toNDecimalPlaces(durationMs / 1000, 2)
delete queryStartTime[queryId]
if (!isNaN(durationSec))
metricQueryDuration
.labels({
sqlMethod: normalizeSqlMethod(querySpec.method),
sqlNumberBindings: querySpec.bindings?.length || -1
})
.observe(durationSec)
params.logger.debug(
{
sql: querySpec.sql,
sqlMethod: normalizeSqlMethod(querySpec.method),
sqlQueryId: queryId,
sqlQueryDurationMs: toNDecimalPlaces(durationMs, 0),
sqlNumberBindings: querySpec.bindings?.length || -1
},
"DB query successfully completed, for method '{sqlMethod}', after {sqlQueryDurationMs}ms"
)
})
db.on('query-error', (err: unknown, data: QueryEvent) => {
const queryId = data.__knexQueryUid + ''
const durationMs = performance.now() - queryStartTime[queryId]
const durationSec = toNDecimalPlaces(durationMs / 1000, 2)
delete queryStartTime[queryId]
params.db.on('query-error', (err, querySpec) => {
const queryId = querySpec.__knexQueryUid + ''
const durationMs = performance.now() - queryStartTime[queryId]
const durationSec = toNDecimalPlaces(durationMs / 1000, 2)
delete queryStartTime[queryId]
if (!isNaN(durationSec))
metricQueryDuration
.labels({
region,
sqlMethod: normalizeSqlMethod(data.method),
sqlNumberBindings: data.bindings?.length || -1
})
.observe(durationSec)
metricQueryErrors.inc()
params.logger.warn(
{
err: typeof err === 'object' ? omit(err, 'detail') : err,
region,
sql: data.sql,
sqlMethod: normalizeSqlMethod(data.method),
sqlQueryId: queryId,
sqlQueryDurationMs: toNDecimalPlaces(durationMs, 0),
sqlNumberBindings: data.bindings?.length || -1
},
'DB query errored for {sqlMethod} after {sqlQueryDurationMs}ms'
)
})
if (!isNaN(durationSec))
metricQueryDuration
.labels({
sqlMethod: normalizeSqlMethod(querySpec.method),
sqlNumberBindings: querySpec.bindings?.length || -1
})
.observe(durationSec)
metricQueryErrors.inc()
params.logger.warn(
{
err: omit(err, 'detail'),
sql: querySpec.sql,
sqlMethod: normalizeSqlMethod(querySpec.method),
sqlQueryId: queryId,
sqlQueryDurationMs: toNDecimalPlaces(durationMs, 0),
sqlNumberBindings: querySpec.bindings?.length || -1
},
'DB query errored for {sqlMethod} after {sqlQueryDurationMs}ms'
)
})
const pool = db.client.pool
const pool = params.db.client.pool
// configure hooks on knex connection pool
pool.on('acquireRequest', (eventId: number) => {
connectionAcquisitionStartTime[eventId] = performance.now()
// params.logger.debug(
// {
// eventId
// },
// 'DB connection acquisition request occurred.'
// )
})
pool.on('acquireSuccess', (eventId: number, resource: unknown) => {
const now = performance.now()
const durationMs = now - connectionAcquisitionStartTime[eventId]
delete connectionAcquisitionStartTime[eventId]
if (!isNaN(durationMs))
metricConnectionAcquisitionDuration.labels({ region }).observe(durationMs)
// configure hooks on knex connection pool
pool.on('acquireRequest', (eventId: number) => {
connectionAcquisitionStartTime[eventId] = performance.now()
// params.logger.debug(
// {
// eventId
// },
// 'DB connection acquisition request occurred.'
// )
})
pool.on('acquireSuccess', (eventId: number, resource: unknown) => {
const now = performance.now()
const durationMs = now - connectionAcquisitionStartTime[eventId]
delete connectionAcquisitionStartTime[eventId]
if (!isNaN(durationMs)) metricConnectionAcquisitionDuration.observe(durationMs)
// successful acquisition is the start of usage, so record that start time
let knexUid: string | undefined = undefined
if (resource && typeof resource === 'object' && '__knexUid' in resource) {
const _knexUid = resource['__knexUid']
if (_knexUid && typeof _knexUid === 'string') {
knexUid = _knexUid
connectionInUseStartTime[knexUid] = now
}
// successful acquisition is the start of usage, so record that start time
let knexUid: string | undefined = undefined
if (resource && typeof resource === 'object' && '__knexUid' in resource) {
const _knexUid = resource['__knexUid']
if (_knexUid && typeof _knexUid === 'string') {
knexUid = _knexUid
connectionInUseStartTime[knexUid] = now
}
}
// params.logger.debug(
// {
// eventId,
// knexUid,
// connectionAcquisitionDurationMs: toNDecimalPlaces(durationMs, 0)
// },
// 'DB connection (knexUid: {knexUid}) acquired after {connectionAcquisitionDurationMs}ms'
// )
})
pool.on('acquireFail', (eventId: number, err: unknown) => {
const now = performance.now()
const durationMs = now - connectionAcquisitionStartTime[eventId]
delete connectionAcquisitionStartTime[eventId]
metricConnectionPoolErrors.inc()
params.logger.warn(
{
err,
eventId,
connectionAcquisitionDurationMs: toNDecimalPlaces(durationMs, 0)
},
'DB connection acquisition failed after {connectionAcquisitionDurationMs}ms'
)
})
// params.logger.debug(
// {
// eventId,
// knexUid,
// connectionAcquisitionDurationMs: toNDecimalPlaces(durationMs, 0)
// },
// 'DB connection (knexUid: {knexUid}) acquired after {connectionAcquisitionDurationMs}ms'
// )
})
pool.on('acquireFail', (eventId: number, err: unknown) => {
const now = performance.now()
const durationMs = now - connectionAcquisitionStartTime[eventId]
delete connectionAcquisitionStartTime[eventId]
metricConnectionPoolErrors.inc()
params.logger.warn(
{
err,
eventId,
connectionAcquisitionDurationMs: toNDecimalPlaces(durationMs, 0)
},
'DB connection acquisition failed after {connectionAcquisitionDurationMs}ms'
)
})
// resource returned to pool
pool.on('release', (resource: unknown) => {
if (!(resource && typeof resource === 'object' && '__knexUid' in resource)) return
const knexUid = resource['__knexUid']
if (!knexUid || typeof knexUid !== 'string') return
// resource returned to pool
pool.on('release', (resource: unknown) => {
if (!(resource && typeof resource === 'object' && '__knexUid' in resource)) return
const knexUid = resource['__knexUid']
if (!knexUid || typeof knexUid !== 'string') return
const now = performance.now()
const durationMs = now - connectionInUseStartTime[knexUid]
if (!isNaN(durationMs))
metricConnectionInUseDuration.labels({ region }).observe(durationMs)
// params.logger.debug(
// {
// knexUid,
// connectionInUseDurationMs: toNDecimalPlaces(durationMs, 0)
// },
// 'DB connection (knexUid: {knexUid}) released after {connectionInUseDurationMs}ms'
// )
})
const now = performance.now()
const durationMs = now - connectionInUseStartTime[knexUid]
if (!isNaN(durationMs)) metricConnectionInUseDuration.observe(durationMs)
// params.logger.debug(
// {
// knexUid,
// connectionInUseDurationMs: toNDecimalPlaces(durationMs, 0)
// },
// 'DB connection (knexUid: {knexUid}) released after {connectionInUseDurationMs}ms'
// )
})
// resource was created and added to the pool
// pool.on('createRequest', (eventId) => {})
// pool.on('createSuccess', (eventId, resource) => {})
// pool.on('createFail', (eventId, err) => {})
// resource was created and added to the pool
// pool.on('createRequest', (eventId) => {})
// pool.on('createSuccess', (eventId, resource) => {})
// pool.on('createFail', (eventId, err) => {})
// resource is destroyed and evicted from pool
// resource may or may not be invalid when destroySuccess / destroyFail is called
// pool.on('destroyRequest', (eventId, resource) => {})
// pool.on('destroySuccess', (eventId, resource) => {})
// pool.on('destroyFail', (eventId, resource, err) => {})
// resource is destroyed and evicted from pool
// resource may or may not be invalid when destroySuccess / destroyFail is called
// pool.on('destroyRequest', (eventId, resource) => {})
// pool.on('destroySuccess', (eventId, resource) => {})
// pool.on('destroyFail', (eventId, resource, err) => {})
// when internal reaping event clock is activated / deactivated
let reapingStartTime: number | undefined = undefined
pool.on('startReaping', () => {
reapingStartTime = performance.now()
})
pool.on('stopReaping', () => {
if (!reapingStartTime) return
const durationMs = performance.now() - reapingStartTime
if (!isNaN(durationMs))
metricConnectionPoolReapingDuration.labels({ region }).observe(durationMs)
reapingStartTime = undefined
})
// when internal reaping event clock is activated / deactivated
let reapingStartTime: number | undefined = undefined
pool.on('startReaping', () => {
reapingStartTime = performance.now()
})
pool.on('stopReaping', () => {
if (!reapingStartTime) return
const durationMs = performance.now() - reapingStartTime
if (!isNaN(durationMs)) metricConnectionPoolReapingDuration.observe(durationMs)
reapingStartTime = undefined
})
// pool is destroyed (after poolDestroySuccess all event handlers are also cleared)
// pool.on('poolDestroyRequest', (eventId) => {})
// pool.on('poolDestroySuccess', (eventId) => {})
}
// pool is destroyed (after poolDestroySuccess all event handlers are also cleared)
// pool.on('poolDestroyRequest', (eventId) => {})
// pool.on('poolDestroySuccess', (eventId) => {})
}
@@ -24,9 +24,6 @@ import {
} from '@/modules/multiregion/regionConfig'
import { MaybeNullOrUndefined } from '@speckle/shared'
import { isTestEnv } from '@/modules/shared/helpers/envHelper'
import { updateKnexPrometheusMetrics } from '@/logging/knexMonitoring'
import { logger } from '@/logging/logging'
import { migrateDbToLatestFactory } from '@/db/migrations'
let getter: GetProjectDb | undefined = undefined
@@ -116,11 +113,7 @@ export const initializeRegisteredRegionClients = async (): Promise<RegionClients
)
// run migrations
await Promise.all(
Object.entries(ret).map(([region, db]) =>
migrateDbToLatestFactory({ db, region })()
)
)
await Promise.all(Object.values(ret).map((db) => db.migrate.latest()))
// (re-)set up pub-sub, if needed
await Promise.all(
@@ -137,14 +130,6 @@ export const getRegisteredRegionClients = async (): Promise<RegionClients> => {
return registeredRegionClients
}
/**
* @description This is the main db + all the region dbs
*/
export const getAllClients = async (): Promise<RegionClients> => {
const regionClients = await getRegisteredRegionClients()
return { main: db, ...regionClients }
}
export const getRegisteredDbClients = async (): Promise<Knex[]> =>
Object.values(await getRegisteredRegionClients())
@@ -184,11 +169,6 @@ export const initializeRegion: InitializeRegion = async ({ regionKey }) => {
if (registeredRegionClients) {
registeredRegionClients[regionKey] = regionDb.public
}
updateKnexPrometheusMetrics({
getAllDbClients: getAllClients,
logger
})
}
interface ReplicationArgs {
@@ -224,7 +204,7 @@ const setUpUserReplication = async ({
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
'${subName}',
'dbname=${fromDbName} host=${fromUrl.hostname} port=${port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}',
'${pubName}',
'${pubName}',
'${subName}',
TRUE,
TRUE