Merge branch 'main' into andrew/filter-by-selection
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
import { StringEnum } from '@speckle/shared'
|
||||
|
||||
export const PromiseAllSettledResultStatus = StringEnum(['rejected', 'fulfilled'])
|
||||
@@ -163,5 +163,50 @@ export class TestOnlyLogicError extends BaseError {
|
||||
static statusCode = 500
|
||||
}
|
||||
|
||||
const getErrorInfoFromTransactions = (
|
||||
preparedTransactions: { knex: Knex; preparedId: string }[]
|
||||
) => {
|
||||
return preparedTransactions.map(({ knex, preparedId }) => ({
|
||||
db: retrieveMetadataFromDatabaseClient(knex),
|
||||
gid: preparedId
|
||||
}))
|
||||
}
|
||||
|
||||
// 2PC failed but we successfully rolled back all prepared transactions.
|
||||
export class RegionalTransactionError extends BaseError {
|
||||
static code = 'REGIONAL_TRANSACTION_ERROR'
|
||||
static defaultMessage = 'Failed to complete 2PC operation'
|
||||
static statusCode = 500
|
||||
|
||||
constructor(
|
||||
message?: string | null,
|
||||
preparedTransactions: { knex: Knex; preparedId: string }[] = []
|
||||
) {
|
||||
super(message, {
|
||||
info: {
|
||||
clients: getErrorInfoFromTransactions(preparedTransactions)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 2PC failed and we failed to rollback. A prepared transaction may have been left behind.
|
||||
export class RegionalTransactionFatalError extends BaseError {
|
||||
static code = 'REGIONAL_TRANSACTION_FATAL_ERROR'
|
||||
static defaultMessage = 'Failed to rollback 2PC operation'
|
||||
static statusCode = 500
|
||||
|
||||
constructor(
|
||||
message?: string | null,
|
||||
preparedTransactions: { knex: Knex; preparedId: string }[] = []
|
||||
) {
|
||||
super(message, {
|
||||
info: {
|
||||
clients: getErrorInfoFromTransactions(preparedTransactions)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export { BaseError }
|
||||
export type { Info }
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
import type { Knex } from 'knex'
|
||||
import { postgresMaxConnections } from '@/modules/shared/helpers/envHelper'
|
||||
import { EnvironmentResourceError, LogicError } from '@/modules/shared/errors'
|
||||
import {
|
||||
EnvironmentResourceError,
|
||||
LogicError,
|
||||
RegionalTransactionFatalError,
|
||||
RegionalTransactionError
|
||||
} from '@/modules/shared/errors'
|
||||
import type { MaybeAsync } from '@speckle/shared'
|
||||
import { isNonNullable } from '@speckle/shared'
|
||||
import { base64Decode, base64Encode } from '@/modules/shared/helpers/cryptoHelper'
|
||||
@@ -11,6 +16,9 @@ import type { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
|
||||
import type { SchemaConfig } from '@/modules/core/dbSchema'
|
||||
import { has, isObjectLike, isString, mapValues, pick, times } from 'lodash-es'
|
||||
import cryptoRandomString from 'crypto-random-string'
|
||||
import { logger } from '@/observability/logging'
|
||||
import { isEqual } from 'lodash-es'
|
||||
import { PromiseAllSettledResultStatus } from '@/modules/shared/domain/constants'
|
||||
|
||||
export type Collection<T> = {
|
||||
cursor: string | null
|
||||
@@ -316,9 +324,115 @@ export const prepareTransaction = async (db: Knex): Promise<string> => {
|
||||
return preparedId
|
||||
}
|
||||
|
||||
export const commitPreparedTransaction = async (
|
||||
db: Knex,
|
||||
gid: string
|
||||
): Promise<void> => {
|
||||
await db.raw(`COMMIT PREPARED '${gid}';`)
|
||||
}
|
||||
|
||||
export const rollbackPreparedTransaction = async (
|
||||
db: Knex,
|
||||
gid: string
|
||||
): Promise<void> => {
|
||||
await db.raw(`ROLLBACK PREPARED '${gid}';`)
|
||||
}
|
||||
|
||||
export const replicateQuery = <T, U>(
|
||||
dbs: Knex[],
|
||||
factory: ({ db }: { db: Knex }) => (params: T) => Promise<U>
|
||||
) => {
|
||||
return async (params: T) => {
|
||||
const preparedTransactions: {
|
||||
knex: Knex
|
||||
preparedId: string
|
||||
}[] = []
|
||||
|
||||
const returnValues: U[] = []
|
||||
|
||||
try {
|
||||
// Phase 1: Prepare transaction across all specified db instances
|
||||
for (const db of dbs) {
|
||||
const trx = await db.transaction()
|
||||
const returnValue = await factory({ db: trx })(params)
|
||||
returnValues.push(returnValue)
|
||||
const preparedId = await prepareTransaction(trx)
|
||||
preparedTransactions.push({ knex: db, preparedId })
|
||||
}
|
||||
|
||||
// Phase 2: Attempt commit of all prepared transactions
|
||||
const results = await Promise.allSettled(
|
||||
preparedTransactions.map(({ knex, preparedId }) => {
|
||||
return commitPreparedTransaction(knex, preparedId)
|
||||
})
|
||||
)
|
||||
|
||||
const errors = results.filter((result): result is PromiseRejectedResult => {
|
||||
return result.status === PromiseAllSettledResultStatus.rejected
|
||||
})
|
||||
|
||||
if (errors.length > 0) {
|
||||
logger.error(
|
||||
{
|
||||
params,
|
||||
errors,
|
||||
errorCount: errors.length,
|
||||
resultCount: results.length
|
||||
},
|
||||
`Failed {errorCount} of {resultCount} transactions in 2PC operation.`
|
||||
)
|
||||
throw new RegionalTransactionError(
|
||||
'Failed some or all transactions in 2PC operation.',
|
||||
preparedTransactions
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: Do we need this validation?
|
||||
if (!returnValues.every((value) => isEqual(value, returnValues[0]))) {
|
||||
throw new RegionalTransactionError(
|
||||
'Return values of 2PC transactions do not match',
|
||||
preparedTransactions
|
||||
)
|
||||
}
|
||||
|
||||
return returnValues[0]
|
||||
} catch {
|
||||
const rollbacks = preparedTransactions.map(async ({ knex, preparedId }) => {
|
||||
try {
|
||||
await rollbackPreparedTransaction(knex, preparedId)
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ preparedId },
|
||||
'Failed to rollback prepared transaction {preparedId}'
|
||||
)
|
||||
throw err
|
||||
}
|
||||
})
|
||||
|
||||
logger.warn(
|
||||
{
|
||||
preparedTransactions: preparedTransactions.map(({ preparedId }) => preparedId)
|
||||
},
|
||||
'Error during 2PC operation. Rolling back all transactions.'
|
||||
)
|
||||
|
||||
const results = await Promise.allSettled(rollbacks)
|
||||
|
||||
if (
|
||||
results.some(
|
||||
(result) => result.status === PromiseAllSettledResultStatus.rejected
|
||||
)
|
||||
) {
|
||||
throw new RegionalTransactionFatalError(
|
||||
'Failed to rollback all transactions.',
|
||||
preparedTransactions
|
||||
)
|
||||
}
|
||||
|
||||
throw new RegionalTransactionError(
|
||||
'Failed to complete 2PC operation but successfully recovered.',
|
||||
preparedTransactions
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
import { getDb } from '@/modules/multiregion/utils/dbSelector'
|
||||
import { Scopes } from '@/modules/core/dbSchema'
|
||||
import { expect } from 'chai'
|
||||
import type { Knex } from 'knex'
|
||||
import { replicateQuery } from '@/modules/shared/helpers/dbHelper'
|
||||
import { isMultiRegionTestMode } from '@/test/speckle-helpers/regions'
|
||||
|
||||
isMultiRegionTestMode()
|
||||
? describe('Prepared transaction utils (2PC) @multiregion', async () => {
|
||||
let main: Knex
|
||||
let region1: Knex
|
||||
let region2: Knex
|
||||
let ALL_DBS: Knex[] = []
|
||||
|
||||
const testOperationFactory =
|
||||
({ db }: { db: Knex }) =>
|
||||
async (payload: {
|
||||
name: string
|
||||
description: string
|
||||
public: boolean
|
||||
}): Promise<void> => {
|
||||
await db(Scopes.name).insert(payload)
|
||||
}
|
||||
|
||||
before(async () => {
|
||||
main = await getDb({ regionKey: null })
|
||||
region1 = await getDb({ regionKey: 'region1' })
|
||||
region2 = await getDb({ regionKey: 'region2' })
|
||||
ALL_DBS = [main, region1, region2]
|
||||
})
|
||||
|
||||
it('successfully replicates operation across all specified db instances', async () => {
|
||||
const testOperationParams = {
|
||||
name: 'test:scope:a',
|
||||
description: 'for test purposes only',
|
||||
public: false
|
||||
}
|
||||
|
||||
await replicateQuery(ALL_DBS, testOperationFactory)(testOperationParams)
|
||||
|
||||
const scopeMain = await main
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion1 = await region1
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion2 = await region2
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
|
||||
expect(scopeMain).to.deep.eq(testOperationParams)
|
||||
expect(scopeMain).to.deep.equal(scopeRegion1)
|
||||
expect(scopeMain).to.deep.equal(scopeRegion2)
|
||||
})
|
||||
|
||||
it('rolls back when one node fails on write', async () => {
|
||||
// Create scope before replicated query
|
||||
const testOperationParams = {
|
||||
name: 'test:scope:b',
|
||||
description: 'for test purposes only',
|
||||
public: false
|
||||
}
|
||||
|
||||
await testOperationFactory({ db: region2 })(testOperationParams)
|
||||
|
||||
const promise = replicateQuery(
|
||||
ALL_DBS,
|
||||
testOperationFactory
|
||||
)(testOperationParams)
|
||||
|
||||
await expect(promise).eventually.to.be.rejected
|
||||
|
||||
const scopeMain = await main
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion1 = await region1
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion2 = await region2
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
|
||||
expect(scopeMain).to.be.undefined
|
||||
expect(scopeRegion1).to.be.undefined
|
||||
expect(scopeRegion2).to.exist
|
||||
})
|
||||
|
||||
it('rolls back all commits in case of one node failure on transaction', async () => {
|
||||
const testOperationParams = {
|
||||
name: 'test:scope:c',
|
||||
description: 'for test purposes only',
|
||||
public: false
|
||||
}
|
||||
|
||||
const dbThatFails = {
|
||||
transaction: () =>
|
||||
Promise.resolve(() => ({
|
||||
insert: () => Promise.resolve()
|
||||
})) // will fail on raw call
|
||||
} as unknown as Knex
|
||||
|
||||
const promise = replicateQuery(
|
||||
[...ALL_DBS, dbThatFails],
|
||||
testOperationFactory
|
||||
)(testOperationParams)
|
||||
|
||||
await expect(promise).to.eventually.be.rejected
|
||||
|
||||
const scopeMain = await main
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion1 = await region1
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
const scopeRegion2 = await region2
|
||||
.table(Scopes.name)
|
||||
.where({ name: testOperationParams.name })
|
||||
.first()
|
||||
|
||||
expect(scopeMain).to.be.undefined
|
||||
expect(scopeRegion1).to.be.undefined
|
||||
expect(scopeRegion2).to.be.undefined
|
||||
})
|
||||
})
|
||||
: null
|
||||
@@ -992,7 +992,7 @@ describe('Workspace project GQL CRUD', () => {
|
||||
})
|
||||
|
||||
isMultiRegionTestMode()
|
||||
? describe('when the default server db region is not the main db', () => {
|
||||
? describe('when the default server db region is not the main db @multiregion', () => {
|
||||
const regionalProject: StreamRecord = {
|
||||
id: cryptoRandomString({ length: 9 }),
|
||||
name: 'My Special Project',
|
||||
|
||||
@@ -45,6 +45,8 @@ import { set } from 'lodash-es'
|
||||
import { fixStackTrace } from '@/test/speckle-helpers/error'
|
||||
import { EnvironmentResourceError } from '@/modules/shared/errors'
|
||||
import * as mocha from 'mocha'
|
||||
import { getStalePreparedTransactionsFactory } from '@/modules/multiregion/repositories/transactions'
|
||||
import { rollbackPreparedTransaction } from '@/modules/shared/helpers/dbHelper'
|
||||
|
||||
// Register chai plugins
|
||||
chai.use(chaiAsPromised)
|
||||
@@ -258,6 +260,13 @@ const resetSchemaFactory =
|
||||
const resetPubSub = resetPubSubFactory(deps)
|
||||
const truncate = truncateTablesFactory(deps)
|
||||
|
||||
const pendingTransactions = await getStalePreparedTransactionsFactory({
|
||||
db: deps.db
|
||||
})({ interval: '1 second' })
|
||||
await Promise.all(
|
||||
pendingTransactions.map(({ gid }) => rollbackPreparedTransaction(deps.db, gid))
|
||||
)
|
||||
|
||||
await unlockFactory(deps)()
|
||||
await resetPubSub()
|
||||
await truncate() // otherwise some rollbacks will fail
|
||||
|
||||
Reference in New Issue
Block a user