feat: added schedule for deleting stale prepared transactions (#5169)

This commit is contained in:
Daniel Gak Anagrov
2025-08-12 16:23:27 +02:00
committed by GitHub
parent 1789e36813
commit ba8a62dd2a
8 changed files with 145 additions and 2 deletions
+3
View File
@@ -16,6 +16,7 @@ services:
- ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql
ports: ports:
- '127.0.0.1:5432:5432' - '127.0.0.1:5432:5432'
command: postgres -c max_prepared_transactions=150
postgres-region1: postgres-region1:
build: build:
@@ -32,6 +33,7 @@ services:
- ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql
ports: ports:
- '127.0.0.1:5401:5432' - '127.0.0.1:5401:5432'
command: postgres -c max_prepared_transactions=150
postgres-region2: postgres-region2:
build: build:
@@ -48,6 +50,7 @@ services:
- ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql
ports: ports:
- '127.0.0.1:5402:5432' - '127.0.0.1:5402:5432'
command: postgres -c max_prepared_transactions=150
redis: redis:
image: 'valkey/valkey:8.1-alpine' image: 'valkey/valkey:8.1-alpine'
@@ -13,3 +13,9 @@ export type ProjectRegion = {
projectId: string projectId: string
regionKey: RegionKey regionKey: RegionKey
} }
export type StalePendingTransaction = {
transaction: string
gid: string
prepared: Date
}
@@ -14,6 +14,10 @@ import {
shutdownQueue, shutdownQueue,
startQueue startQueue
} from '@/modules/multiregion/services/queue' } from '@/modules/multiregion/services/queue'
import { scheduleStalePreparedTransactionCleanup } from '@/modules/multiregion/tasks/pendingTransactions'
import type cron from 'node-cron'
let scheduledTasks: cron.ScheduledTask[] = []
const multiRegion: SpeckleModule = { const multiRegion: SpeckleModule = {
async init({ isInitial }) { async init({ isInitial }) {
@@ -38,10 +42,14 @@ const multiRegion: SpeckleModule = {
if (isInitial) { if (isInitial) {
await initializeQueue() await initializeQueue()
await startQueue() await startQueue()
scheduledTasks = [await scheduleStalePreparedTransactionCleanup()]
} }
}, },
async shutdown() { async shutdown() {
await shutdownQueue() await shutdownQueue()
scheduledTasks.forEach((task) => {
task.stop()
})
} }
} }
@@ -0,0 +1,13 @@
import type { StalePendingTransaction } from '@/modules/multiregion/domain/types'
import type { Knex } from 'knex'
export const getStalePreparedTransactionsFactory =
({ db }: { db: Knex }) =>
async (args: { interval?: string }): Promise<StalePendingTransaction[]> => {
const { interval = '2 minutes' } = args
return (
await db.raw<{ rows: StalePendingTransaction[] }>(
`SELECT * FROM pg_prepared_xacts WHERE prepared < NOW() - INTERVAL '${interval}';`
)
).rows
}
@@ -0,0 +1,54 @@
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import {
acquireTaskLockFactory,
releaseTaskLockFactory
} from '@/modules/core/repositories/scheduledTasks'
import { db } from '@/db/knex'
import type { Logger } from '@/observability/logging'
import type { Knex } from 'knex'
import { getAllRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector'
import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions'
import { rollbackPreparedTransaction } from '@/modules/shared/helpers/dbHelper'
const rollbackStalePreparedTransactions = async ({
allRegions,
logger
}: {
allRegions: { client: Knex; regionKey: string }[]
logger: Logger
}): Promise<void> => {
for (const { regionKey, client } of allRegions) {
const getStalePreparedTransactions = getStalePreparedTransactionsFactory({
db: client
})
const pendingTransactions = await getStalePreparedTransactions({})
if (!pendingTransactions.length) continue
logger.error(
{ pendingTransactions, regionKey },
'Found stale prepared transactions.'
)
await Promise.allSettled(
pendingTransactions.map(({ gid }) => rollbackPreparedTransaction(client, gid))
)
}
}
export const scheduleStalePreparedTransactionCleanup = async () => {
const allRegions = await getAllRegisteredDbClients()
const scheduleExecution = scheduleExecutionFactory({
acquireTaskLock: acquireTaskLockFactory({ db }),
releaseTaskLock: releaseTaskLockFactory({ db })
})
const every5Mins = '*/5 * * * *'
return scheduleExecution(
every5Mins,
'RollbackStalePreparedTransactions',
async (_scheduledTime, { logger }) => {
await rollbackStalePreparedTransactions({ logger, allRegions })
}
)
}
@@ -0,0 +1,39 @@
import type { Knex } from 'knex'
import { db } from '@/db/knex'
import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions'
import {
prepareTransaction,
rollbackPreparedTransaction
} from '@/modules/shared/helpers/dbHelper'
import { wait } from '@speckle/shared'
import { expect } from 'chai'
describe('prepared transaction repository functions', () => {
describe('getStalePreparedTransactionsFactory returns a function, that', () => {
let trx: Knex
let transactionId: string = ''
beforeEach(async () => {
trx = await db.transaction()
transactionId = await prepareTransaction(trx)
})
afterEach(async () => {
await rollbackPreparedTransaction(trx, transactionId)
})
it('returns prepared transactions older than a given time interval', async () => {
await wait(5000)
const result = await getStalePreparedTransactionsFactory({ db })({
interval: '1 second'
})
expect(result.length).to.equal(1)
expect(result.at(0)?.gid).to.equal(transactionId)
})
it('does not return recently prepared transactions', async () => {
const result = await getStalePreparedTransactionsFactory({ db })({})
expect(result.length).to.equal(0)
})
})
})
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
import type { Knex } from 'knex' import type { Knex } from 'knex'
import { postgresMaxConnections } from '@/modules/shared/helpers/envHelper' import { postgresMaxConnections } from '@/modules/shared/helpers/envHelper'
import { EnvironmentResourceError } from '@/modules/shared/errors' import { EnvironmentResourceError, LogicError } from '@/modules/shared/errors'
import type { MaybeAsync } from '@speckle/shared' import type { MaybeAsync } from '@speckle/shared'
import { isNonNullable } from '@speckle/shared' import { isNonNullable } from '@speckle/shared'
import { base64Decode, base64Encode } from '@/modules/shared/helpers/cryptoHelper' import { base64Decode, base64Encode } from '@/modules/shared/helpers/cryptoHelper'
@@ -10,6 +10,7 @@ import dayjs from 'dayjs'
import type { MaybeNullOrUndefined, Nullable } from '@speckle/shared' import type { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
import type { SchemaConfig } from '@/modules/core/dbSchema' import type { SchemaConfig } from '@/modules/core/dbSchema'
import { has, isObjectLike, isString, mapValues, pick, times } from 'lodash-es' import { has, isObjectLike, isString, mapValues, pick, times } from 'lodash-es'
import cryptoRandomString from 'crypto-random-string'
export type Collection<T> = { export type Collection<T> = {
cursor: string | null cursor: string | null
@@ -302,3 +303,22 @@ export const compositeCursorTools = <
resolveNewCursor resolveNewCursor
} }
} }
export const prepareTransaction = async (db: Knex): Promise<string> => {
if (!db.isTransaction) {
throw new LogicError('Cannot PREPARE postgres operation outside of a transaction')
}
const preparedId = cryptoRandomString({ length: 10 })
await db.raw(`PREPARE TRANSACTION '${preparedId}';`)
return preparedId
}
export const rollbackPreparedTransaction = async (
db: Knex,
gid: string
): Promise<void> => {
await db.raw(`ROLLBACK PREPARED '${gid}';`)
}
+1 -1
View File
@@ -22,4 +22,4 @@ COPY --link --from=builder /aiven-extras/aiven_extras.so /usr/local/lib/postgres
EXPOSE 5432 EXPOSE 5432
CMD ["postgres"] CMD ["postgres", "-c", "max_prepared_transactions=150"]