diff --git a/packages/server/modules/shared/domain/constants.ts b/packages/server/modules/shared/domain/constants.ts new file mode 100644 index 000000000..397281db6 --- /dev/null +++ b/packages/server/modules/shared/domain/constants.ts @@ -0,0 +1,3 @@ +import { StringEnum } from '@speckle/shared' + +export const PromiseAllSettledResultStatus = StringEnum(['rejected', 'fulfilled']) diff --git a/packages/server/modules/shared/errors/index.ts b/packages/server/modules/shared/errors/index.ts index ff8fac6cd..b01158531 100644 --- a/packages/server/modules/shared/errors/index.ts +++ b/packages/server/modules/shared/errors/index.ts @@ -163,5 +163,50 @@ export class TestOnlyLogicError extends BaseError { static statusCode = 500 } +const getErrorInfoFromTransactions = ( + preparedTransactions: { knex: Knex; preparedId: string }[] +) => { + return preparedTransactions.map(({ knex, preparedId }) => ({ + db: retrieveMetadataFromDatabaseClient(knex), + gid: preparedId + })) +} + +// 2PC failed but we successfully rolled back all prepared transactions. +export class RegionalTransactionError extends BaseError { + static code = 'REGIONAL_TRANSACTION_ERROR' + static defaultMessage = 'Failed to complete 2PC operation' + static statusCode = 500 + + constructor( + message?: string | null, + preparedTransactions: { knex: Knex; preparedId: string }[] = [] + ) { + super(message, { + info: { + clients: getErrorInfoFromTransactions(preparedTransactions) + } + }) + } +} + +// 2PC failed and we failed to rollback. A prepared transaction may have been left behind. +export class RegionalTransactionFatalError extends BaseError { + static code = 'REGIONAL_TRANSACTION_FATAL_ERROR' + static defaultMessage = 'Failed to rollback 2PC operation' + static statusCode = 500 + + constructor( + message?: string | null, + preparedTransactions: { knex: Knex; preparedId: string }[] = [] + ) { + super(message, { + info: { + clients: getErrorInfoFromTransactions(preparedTransactions) + } + }) + } +} + export { BaseError } export type { Info } diff --git a/packages/server/modules/shared/helpers/dbHelper.ts b/packages/server/modules/shared/helpers/dbHelper.ts index 56f520129..71e2b9681 100644 --- a/packages/server/modules/shared/helpers/dbHelper.ts +++ b/packages/server/modules/shared/helpers/dbHelper.ts @@ -1,7 +1,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { Knex } from 'knex' import { postgresMaxConnections } from '@/modules/shared/helpers/envHelper' -import { EnvironmentResourceError, LogicError } from '@/modules/shared/errors' +import { + EnvironmentResourceError, + LogicError, + RegionalTransactionFatalError, + RegionalTransactionError +} from '@/modules/shared/errors' import type { MaybeAsync } from '@speckle/shared' import { isNonNullable } from '@speckle/shared' import { base64Decode, base64Encode } from '@/modules/shared/helpers/cryptoHelper' @@ -11,6 +16,9 @@ import type { MaybeNullOrUndefined, Nullable } from '@speckle/shared' import type { SchemaConfig } from '@/modules/core/dbSchema' import { has, isObjectLike, isString, mapValues, pick, times } from 'lodash-es' import cryptoRandomString from 'crypto-random-string' +import { logger } from '@/observability/logging' +import { isEqual } from 'lodash-es' +import { PromiseAllSettledResultStatus } from '@/modules/shared/domain/constants' export type Collection = { cursor: string | null @@ -316,9 +324,115 @@ export const prepareTransaction = async (db: Knex): Promise => { return preparedId } +export const commitPreparedTransaction = async ( + db: Knex, + gid: string +): Promise => { + await db.raw(`COMMIT PREPARED '${gid}';`) +} + export const rollbackPreparedTransaction = async ( db: Knex, gid: string ): Promise => { await db.raw(`ROLLBACK PREPARED '${gid}';`) } + +export const replicateQuery = ( + dbs: Knex[], + factory: ({ db }: { db: Knex }) => (params: T) => Promise +) => { + return async (params: T) => { + const preparedTransactions: { + knex: Knex + preparedId: string + }[] = [] + + const returnValues: U[] = [] + + try { + // Phase 1: Prepare transaction across all specified db instances + for (const db of dbs) { + const trx = await db.transaction() + const returnValue = await factory({ db: trx })(params) + returnValues.push(returnValue) + const preparedId = await prepareTransaction(trx) + preparedTransactions.push({ knex: db, preparedId }) + } + + // Phase 2: Attempt commit of all prepared transactions + const results = await Promise.allSettled( + preparedTransactions.map(({ knex, preparedId }) => { + return commitPreparedTransaction(knex, preparedId) + }) + ) + + const errors = results.filter((result): result is PromiseRejectedResult => { + return result.status === PromiseAllSettledResultStatus.rejected + }) + + if (errors.length > 0) { + logger.error( + { + params, + errors, + errorCount: errors.length, + resultCount: results.length + }, + `Failed {errorCount} of {resultCount} transactions in 2PC operation.` + ) + throw new RegionalTransactionError( + 'Failed some or all transactions in 2PC operation.', + preparedTransactions + ) + } + + // TODO: Do we need this validation? + if (!returnValues.every((value) => isEqual(value, returnValues[0]))) { + throw new RegionalTransactionError( + 'Return values of 2PC transactions do not match', + preparedTransactions + ) + } + + return returnValues[0] + } catch { + const rollbacks = preparedTransactions.map(async ({ knex, preparedId }) => { + try { + await rollbackPreparedTransaction(knex, preparedId) + } catch (err) { + logger.error( + { preparedId }, + 'Failed to rollback prepared transaction {preparedId}' + ) + throw err + } + }) + + logger.warn( + { + preparedTransactions: preparedTransactions.map(({ preparedId }) => preparedId) + }, + 'Error during 2PC operation. Rolling back all transactions.' + ) + + const results = await Promise.allSettled(rollbacks) + + if ( + results.some( + (result) => result.status === PromiseAllSettledResultStatus.rejected + ) + ) { + throw new RegionalTransactionFatalError( + 'Failed to rollback all transactions.', + preparedTransactions + ) + } + + throw new RegionalTransactionError( + 'Failed to complete 2PC operation but successfully recovered.', + preparedTransactions + ) + } + } +} diff --git a/packages/server/modules/shared/test/dbHelper.spec.ts b/packages/server/modules/shared/test/dbHelper.spec.ts new file mode 100644 index 000000000..38f9eadce --- /dev/null +++ b/packages/server/modules/shared/test/dbHelper.spec.ts @@ -0,0 +1,133 @@ +import { getDb } from '@/modules/multiregion/utils/dbSelector' +import { Scopes } from '@/modules/core/dbSchema' +import { expect } from 'chai' +import type { Knex } from 'knex' +import { replicateQuery } from '@/modules/shared/helpers/dbHelper' +import { isMultiRegionTestMode } from '@/test/speckle-helpers/regions' + +isMultiRegionTestMode() + ? describe('Prepared transaction utils (2PC) @multiregion', async () => { + let main: Knex + let region1: Knex + let region2: Knex + let ALL_DBS: Knex[] = [] + + const testOperationFactory = + ({ db }: { db: Knex }) => + async (payload: { + name: string + description: string + public: boolean + }): Promise => { + await db(Scopes.name).insert(payload) + } + + before(async () => { + main = await getDb({ regionKey: null }) + region1 = await getDb({ regionKey: 'region1' }) + region2 = await getDb({ regionKey: 'region2' }) + ALL_DBS = [main, region1, region2] + }) + + it('successfully replicates operation across all specified db instances', async () => { + const testOperationParams = { + name: 'test:scope:a', + description: 'for test purposes only', + public: false + } + + await replicateQuery(ALL_DBS, testOperationFactory)(testOperationParams) + + const scopeMain = await main + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion1 = await region1 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion2 = await region2 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + + expect(scopeMain).to.deep.eq(testOperationParams) + expect(scopeMain).to.deep.equal(scopeRegion1) + expect(scopeMain).to.deep.equal(scopeRegion2) + }) + + it('rolls back when one node fails on write', async () => { + // Create scope before replicated query + const testOperationParams = { + name: 'test:scope:b', + description: 'for test purposes only', + public: false + } + + await testOperationFactory({ db: region2 })(testOperationParams) + + const promise = replicateQuery( + ALL_DBS, + testOperationFactory + )(testOperationParams) + + await expect(promise).eventually.to.be.rejected + + const scopeMain = await main + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion1 = await region1 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion2 = await region2 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + + expect(scopeMain).to.be.undefined + expect(scopeRegion1).to.be.undefined + expect(scopeRegion2).to.exist + }) + + it('rolls back all commits in case of one node failure on transaction', async () => { + const testOperationParams = { + name: 'test:scope:c', + description: 'for test purposes only', + public: false + } + + const dbThatFails = { + transaction: () => + Promise.resolve(() => ({ + insert: () => Promise.resolve() + })) // will fail on raw call + } as unknown as Knex + + const promise = replicateQuery( + [...ALL_DBS, dbThatFails], + testOperationFactory + )(testOperationParams) + + await expect(promise).to.eventually.be.rejected + + const scopeMain = await main + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion1 = await region1 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + const scopeRegion2 = await region2 + .table(Scopes.name) + .where({ name: testOperationParams.name }) + .first() + + expect(scopeMain).to.be.undefined + expect(scopeRegion1).to.be.undefined + expect(scopeRegion2).to.be.undefined + }) + }) + : null diff --git a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts index b0e7fc8b2..99a1f4f0c 100644 --- a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts +++ b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts @@ -992,7 +992,7 @@ describe('Workspace project GQL CRUD', () => { }) isMultiRegionTestMode() - ? describe('when the default server db region is not the main db', () => { + ? describe('when the default server db region is not the main db @multiregion', () => { const regionalProject: StreamRecord = { id: cryptoRandomString({ length: 9 }), name: 'My Special Project', diff --git a/packages/server/test/hooks.ts b/packages/server/test/hooks.ts index 03f4aa0f9..e0d870ab2 100644 --- a/packages/server/test/hooks.ts +++ b/packages/server/test/hooks.ts @@ -45,6 +45,8 @@ import { set } from 'lodash-es' import { fixStackTrace } from '@/test/speckle-helpers/error' import { EnvironmentResourceError } from '@/modules/shared/errors' import * as mocha from 'mocha' +import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions' +import { rollbackPreparedTransaction } from '@/modules/shared/helpers/dbHelper' // Register chai plugins chai.use(chaiAsPromised) @@ -258,6 +260,13 @@ const resetSchemaFactory = const resetPubSub = resetPubSubFactory(deps) const truncate = truncateTablesFactory(deps) + const pendingTransactions = await getStalePreparedTransactionsFactory({ + db: deps.db + })({ interval: '1 second' }) + await Promise.all( + pendingTransactions.map(({ gid }) => rollbackPreparedTransaction(deps.db, gid)) + ) + await unlockFactory(deps)() await resetPubSub() await truncate() // otherwise some rollbacks will fail