diff --git a/docker-compose-deps.yml b/docker-compose-deps.yml index 4490f3523..f30d4ba8f 100644 --- a/docker-compose-deps.yml +++ b/docker-compose-deps.yml @@ -16,6 +16,7 @@ services: - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql ports: - '127.0.0.1:5432:5432' + command: postgres -c max_prepared_transactions=150 postgres-region1: build: @@ -32,6 +33,7 @@ services: - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql ports: - '127.0.0.1:5401:5432' + command: postgres -c max_prepared_transactions=150 postgres-region2: build: @@ -48,6 +50,7 @@ services: - ./setup/db/11-docker_postgres_keycloack_init.sql:/docker-entrypoint-initdb.d/11-docker_postgres_keycloack_init.sql ports: - '127.0.0.1:5402:5432' + command: postgres -c max_prepared_transactions=150 redis: image: 'valkey/valkey:8.1-alpine' diff --git a/packages/server/modules/multiregion/domain/types.ts b/packages/server/modules/multiregion/domain/types.ts index d41a87364..4176cb49f 100644 --- a/packages/server/modules/multiregion/domain/types.ts +++ b/packages/server/modules/multiregion/domain/types.ts @@ -13,3 +13,9 @@ export type ProjectRegion = { projectId: string regionKey: RegionKey } + +export type StalePendingTransaction = { + transaction: string + gid: string + prepared: Date +} diff --git a/packages/server/modules/multiregion/index.ts b/packages/server/modules/multiregion/index.ts index 0b6bba597..7faa39497 100644 --- a/packages/server/modules/multiregion/index.ts +++ b/packages/server/modules/multiregion/index.ts @@ -14,6 +14,10 @@ import { shutdownQueue, startQueue } from '@/modules/multiregion/services/queue' +import { scheduleStalePreparedTransactionCleanup } from '@/modules/multiregion/tasks/pendingTransactions' +import type cron from 'node-cron' + +let scheduledTasks: cron.ScheduledTask[] = [] const multiRegion: SpeckleModule = { async init({ isInitial }) { @@ -38,10 +42,14 @@ const multiRegion: SpeckleModule = { if (isInitial) { await initializeQueue() await startQueue() + scheduledTasks = [await scheduleStalePreparedTransactionCleanup()] } }, async shutdown() { await shutdownQueue() + scheduledTasks.forEach((task) => { + task.stop() + }) } } diff --git a/packages/server/modules/multiregion/repositories/transactions.ts b/packages/server/modules/multiregion/repositories/transactions.ts new file mode 100644 index 000000000..2a3afc0c4 --- /dev/null +++ b/packages/server/modules/multiregion/repositories/transactions.ts @@ -0,0 +1,13 @@ +import type { StalePendingTransaction } from '@/modules/multiregion/domain/types' +import type { Knex } from 'knex' + +export const getStalePreparedTransactionsFactory = + ({ db }: { db: Knex }) => + async (args: { interval?: string }): Promise => { + const { interval = '2 minutes' } = args + return ( + await db.raw<{ rows: StalePendingTransaction[] }>( + `SELECT * FROM pg_prepared_xacts WHERE prepared < NOW() - INTERVAL '${interval}';` + ) + ).rows + } diff --git a/packages/server/modules/multiregion/tasks/pendingTransactions.ts b/packages/server/modules/multiregion/tasks/pendingTransactions.ts new file mode 100644 index 000000000..4428ed0ae --- /dev/null +++ b/packages/server/modules/multiregion/tasks/pendingTransactions.ts @@ -0,0 +1,54 @@ +import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' +import { + acquireTaskLockFactory, + releaseTaskLockFactory +} from '@/modules/core/repositories/scheduledTasks' +import { db } from '@/db/knex' +import type { Logger } from '@/observability/logging' +import type { Knex } from 'knex' +import { getAllRegisteredDbClients } from '@/modules/multiregion/utils/dbSelector' +import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions' +import { rollbackPreparedTransaction } from '@/modules/shared/helpers/dbHelper' + +const rollbackStalePreparedTransactions = async ({ + allRegions, + logger +}: { + allRegions: { client: Knex; regionKey: string }[] + logger: Logger +}): Promise => { + for (const { regionKey, client } of allRegions) { + const getStalePreparedTransactions = getStalePreparedTransactionsFactory({ + db: client + }) + const pendingTransactions = await getStalePreparedTransactions({}) + if (!pendingTransactions.length) continue + + logger.error( + { pendingTransactions, regionKey }, + 'Found stale prepared transactions.' + ) + + await Promise.allSettled( + pendingTransactions.map(({ gid }) => rollbackPreparedTransaction(client, gid)) + ) + } +} + +export const scheduleStalePreparedTransactionCleanup = async () => { + const allRegions = await getAllRegisteredDbClients() + + const scheduleExecution = scheduleExecutionFactory({ + acquireTaskLock: acquireTaskLockFactory({ db }), + releaseTaskLock: releaseTaskLockFactory({ db }) + }) + + const every5Mins = '*/5 * * * *' + return scheduleExecution( + every5Mins, + 'RollbackStalePreparedTransactions', + async (_scheduledTime, { logger }) => { + await rollbackStalePreparedTransactions({ logger, allRegions }) + } + ) +} diff --git a/packages/server/modules/multiregion/tests/integration/repositories/transactions.spec.ts b/packages/server/modules/multiregion/tests/integration/repositories/transactions.spec.ts new file mode 100644 index 000000000..e55d71be6 --- /dev/null +++ b/packages/server/modules/multiregion/tests/integration/repositories/transactions.spec.ts @@ -0,0 +1,39 @@ +import type { Knex } from 'knex' +import { db } from '@/db/knex' +import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions' +import { + prepareTransaction, + rollbackPreparedTransaction +} from '@/modules/shared/helpers/dbHelper' +import { wait } from '@speckle/shared' +import { expect } from 'chai' + +describe('prepared transaction repository functions', () => { + describe('getStalePreparedTransactionsFactory returns a function, that', () => { + let trx: Knex + let transactionId: string = '' + + beforeEach(async () => { + trx = await db.transaction() + transactionId = await prepareTransaction(trx) + }) + + afterEach(async () => { + await rollbackPreparedTransaction(trx, transactionId) + }) + + it('returns prepared transactions older than a given time interval', async () => { + await wait(5000) + const result = await getStalePreparedTransactionsFactory({ db })({ + interval: '1 second' + }) + expect(result.length).to.equal(1) + expect(result.at(0)?.gid).to.equal(transactionId) + }) + + it('does not return recently prepared transactions', async () => { + const result = await getStalePreparedTransactionsFactory({ db })({}) + expect(result.length).to.equal(0) + }) + }) +}) diff --git a/packages/server/modules/shared/helpers/dbHelper.ts b/packages/server/modules/shared/helpers/dbHelper.ts index 713351ff1..56f520129 100644 --- a/packages/server/modules/shared/helpers/dbHelper.ts +++ b/packages/server/modules/shared/helpers/dbHelper.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { Knex } from 'knex' import { postgresMaxConnections } from '@/modules/shared/helpers/envHelper' -import { EnvironmentResourceError } from '@/modules/shared/errors' +import { EnvironmentResourceError, LogicError } from '@/modules/shared/errors' import type { MaybeAsync } from '@speckle/shared' import { isNonNullable } from '@speckle/shared' import { base64Decode, base64Encode } from '@/modules/shared/helpers/cryptoHelper' @@ -10,6 +10,7 @@ import dayjs from 'dayjs' import type { MaybeNullOrUndefined, Nullable } from '@speckle/shared' import type { SchemaConfig } from '@/modules/core/dbSchema' import { has, isObjectLike, isString, mapValues, pick, times } from 'lodash-es' +import cryptoRandomString from 'crypto-random-string' export type Collection = { cursor: string | null @@ -302,3 +303,22 @@ export const compositeCursorTools = < resolveNewCursor } } + +export const prepareTransaction = async (db: Knex): Promise => { + if (!db.isTransaction) { + throw new LogicError('Cannot PREPARE postgres operation outside of a transaction') + } + + const preparedId = cryptoRandomString({ length: 10 }) + + await db.raw(`PREPARE TRANSACTION '${preparedId}';`) + + return preparedId +} + +export const rollbackPreparedTransaction = async ( + db: Knex, + gid: string +): Promise => { + await db.raw(`ROLLBACK PREPARED '${gid}';`) +} diff --git a/utils/postgres/Dockerfile b/utils/postgres/Dockerfile index dc9a9ea06..e199d620d 100644 --- a/utils/postgres/Dockerfile +++ b/utils/postgres/Dockerfile @@ -22,4 +22,4 @@ COPY --link --from=builder /aiven-extras/aiven_extras.so /usr/local/lib/postgres EXPOSE 5432 -CMD ["postgres"] +CMD ["postgres", "-c", "max_prepared_transactions=150"]