fix(regions): move webhooks because they deserve love too

This commit is contained in:
Charles Driesler
2025-02-11 00:43:15 +00:00
parent c416484974
commit 21bf57f514
5 changed files with 119 additions and 5 deletions
@@ -372,3 +372,6 @@ export type CopyProjectAutomations = (params: {
export type CopyProjectComments = (params: {
projectIds: string[]
}) => Promise<Record<string, number>>
export type CopyProjectWebhooks = (params: {
projectIds: string[]
}) => Promise<Record<string, number>>
@@ -16,6 +16,7 @@ import {
copyProjectObjectsFactory,
copyProjectsFactory,
copyProjectVersionsFactory,
copyProjectWebhooksFactory,
copyWorkspaceFactory
} from '@/modules/workspaces/repositories/projectRegions'
import {
@@ -37,6 +38,7 @@ import { getProjectAutomationsTotalCountFactory } from '@/modules/automate/repos
import { getFeatureFlags, isTestEnv } from '@/modules/shared/helpers/envHelper'
import { WorkspacesNotYetImplementedError } from '@/modules/workspaces/errors/workspace'
import { getStreamCommentCountFactory } from '@/modules/comments/repositories/comments'
import { getStreamWebhooksFactory } from '@/modules/webhooks/repositories/webhooks'
const { FF_MOVE_PROJECT_REGION_ENABLED } = getFeatureFlags()
@@ -100,6 +102,7 @@ export default {
db: sourceDb
}),
countProjectComments: getStreamCommentCountFactory({ db: sourceDb }),
getProjectWebhooks: getStreamWebhooksFactory({ db: sourceDb }),
getAvailableRegions: getAvailableRegionsFactory({
getRegions: getRegionsFactory({ db }),
canWorkspaceUseRegions: canWorkspaceUseRegionsFactory({
@@ -112,7 +115,8 @@ export default {
copyProjectVersions: copyProjectVersionsFactory({ sourceDb, targetDb }),
copyProjectObjects: copyProjectObjectsFactory({ sourceDb, targetDb }),
copyProjectAutomations: copyProjectAutomationsFactory({ sourceDb, targetDb }),
copyProjectComments: copyProjectCommentsFactory({ sourceDb, targetDb })
copyProjectComments: copyProjectCommentsFactory({ sourceDb, targetDb }),
copyProjectWebhooks: copyProjectWebhooksFactory({ sourceDb, targetDb })
})
return await withTransaction(updateProjectRegion(args), targetDb)
@@ -38,6 +38,7 @@ import {
CopyProjectObjects,
CopyProjects,
CopyProjectVersions,
CopyProjectWebhooks,
CopyWorkspace
} from '@/modules/workspaces/domain/operations'
import { WorkspaceNotFoundError } from '@/modules/workspaces/errors/workspace'
@@ -60,6 +61,7 @@ import {
CommentRecord,
CommentViewRecord
} from '@/modules/comments/helpers/types'
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'
const tables = {
workspaces: (db: Knex) => db<Workspace>(Workspaces.name),
@@ -87,7 +89,9 @@ const tables = {
db<AutomationFunctionRunRecord>(AutomationFunctionRuns.name),
comments: (db: Knex) => db.table<CommentRecord>(Comments.name),
commentViews: (db: Knex) => db.table<CommentViewRecord>(CommentViews.name),
commentLinks: (db: Knex) => db.table<CommentLinkRecord>(CommentLinks.name)
commentLinks: (db: Knex) => db.table<CommentLinkRecord>(CommentLinks.name),
webhooks: (db: Knex) => db.table<Webhook>('webhooks_config'),
webhookEvents: (db: Knex) => db.table<WebhookEvent>('webhooks_events')
}
/**
@@ -527,3 +531,49 @@ export const copyProjectCommentsFactory =
return copiedCommentCountByProjectId
}
/**
* Copies rows from the following tables:
* - webhooks_config
* - webhooks_events
*/
export const copyProjectWebhooksFactory =
(deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectWebhooks =>
async ({ projectIds }) => {
const copiedWebhookCountByProjectId: Record<string, number> = {}
// Copy `webhooks_config` table rows in batches
const selectWebhooks = tables
.webhooks(deps.sourceDb)
.select('*')
.whereIn('streamId', projectIds)
for await (const webhooks of executeBatchedSelect(selectWebhooks)) {
const webhookIds = webhooks.map((webhook) => webhook.id)
// Write `webhooks_config` rows to target db
await tables.webhooks(deps.targetDb).insert(webhooks).onConflict().ignore()
for (const webhook of webhooks) {
copiedWebhookCountByProjectId[webhook.streamId] ??= 0
copiedWebhookCountByProjectId[webhook.streamId]++
}
// Copy `webhooks_events` table rows in batches
const selectWebhookEvents = tables
.webhookEvents(deps.sourceDb)
.select('*')
.whereIn('webhookId', webhookIds)
for await (const webhookEvents of executeBatchedSelect(selectWebhookEvents)) {
// Write `webhooks_events` rows to target db
await tables
.webhookEvents(deps.targetDb)
.insert(webhookEvents)
.onConflict()
.ignore()
}
}
return copiedWebhookCountByProjectId
}
@@ -4,6 +4,7 @@ import { GetStreamBranchCount } from '@/modules/core/domain/branches/operations'
import { GetStreamCommitCount } from '@/modules/core/domain/commits/operations'
import { GetStreamObjectCount } from '@/modules/core/domain/objects/operations'
import { GetProject } from '@/modules/core/domain/projects/operations'
import { GetStreamWebhooks } from '@/modules/webhooks/domain/operations'
import {
CopyProjectAutomations,
CopyProjectComments,
@@ -11,6 +12,7 @@ import {
CopyProjectObjects,
CopyProjects,
CopyProjectVersions,
CopyProjectWebhooks,
CopyWorkspace,
GetAvailableRegions,
UpdateProjectRegion
@@ -25,6 +27,7 @@ export const updateProjectRegionFactory =
countProjectObjects: GetStreamObjectCount
countProjectAutomations: GetProjectAutomationCount
countProjectComments: GetStreamCommentCount
getProjectWebhooks: GetStreamWebhooks
getAvailableRegions: GetAvailableRegions
copyWorkspace: CopyWorkspace
copyProjects: CopyProjects
@@ -33,6 +36,7 @@ export const updateProjectRegionFactory =
copyProjectObjects: CopyProjectObjects
copyProjectAutomations: CopyProjectAutomations
copyProjectComments: CopyProjectComments
copyProjectWebhooks: CopyProjectWebhooks
}): UpdateProjectRegion =>
async (params) => {
const { projectId, regionKey } = params
@@ -80,8 +84,11 @@ export const updateProjectRegionFactory =
// Move comments
const copiedCommentCount = await deps.copyProjectComments({ projectIds })
// Move webhooks
const copiedWebhookCount = await deps.copyProjectWebhooks({ projectIds })
// TODO: Move file blobs
// TODO: Move webhooks
// TODO: Validate state after move captures latest state of project
const sourceProjectModelCount = await deps.countProjectModels(projectId)
@@ -93,13 +100,15 @@ export const updateProjectRegionFactory =
projectId
})
const sourceProjectCommentCount = await deps.countProjectComments(projectId)
const sourceProjectWebhooks = await deps.getProjectWebhooks({ streamId: projectId })
const tests = [
copiedModelCount[projectId] === sourceProjectModelCount,
copiedVersionCount[projectId] === sourceProjectVersionCount,
copiedObjectCount[projectId] === sourceProjectObjectCount,
copiedAutomationCount[projectId] === sourceProjectAutomationCount,
copiedCommentCount[projectId] === sourceProjectCommentCount
copiedCommentCount[projectId] === sourceProjectCommentCount,
copiedWebhookCount[projectId] === sourceProjectWebhooks.length
]
if (!tests.every((test) => !!test)) {
@@ -32,6 +32,11 @@ import {
} from '@/modules/core/helpers/types'
import { grantStreamPermissionsFactory } from '@/modules/core/repositories/streams'
import { getDb } from '@/modules/multiregion/utils/dbSelector'
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'
import {
createWebhookConfigFactory,
createWebhookEventFactory
} from '@/modules/webhooks/repositories/webhooks'
import {
BasicTestWorkspace,
createTestWorkspace
@@ -98,7 +103,9 @@ const tables = {
db<AutomationRunTriggerRecord>(AutomationRunTriggers.name),
automationFunctionRuns: (db: Knex) =>
db<AutomationFunctionRunRecord>(AutomationFunctionRuns.name),
comments: (db: Knex) => db.table<CommentRecord>(Comments.name)
comments: (db: Knex) => db.table<CommentRecord>(Comments.name),
webhooks: (db: Knex) => db.table<Webhook>('webhooks_config'),
webhookEvents: (db: Knex) => db.table<WebhookEvent>('webhooks_events')
}
const grantStreamPermissions = grantStreamPermissionsFactory({ db })
@@ -391,8 +398,10 @@ isMultiRegionTestMode()
let testAutomationFunctionRuns: AutomationFunctionRunRecord[]
let testComment: CommentRecord
let testWebhookId: string
let apollo: TestApolloServer
let sourceRegionDb: Knex
let targetRegionDb: Knex
before(async () => {
@@ -400,6 +409,7 @@ isMultiRegionTestMode()
await waitForRegionUser(adminUser)
apollo = await testApolloServer({ authUserId: adminUser.id })
sourceRegionDb = await getDb({ regionKey: regionKey1 })
targetRegionDb = await getDb({ regionKey: regionKey2 })
})
@@ -462,6 +472,21 @@ isMultiRegionTestMode()
projectId: testProject.id,
objectId: testVersion.objectId
})
testWebhookId = await createWebhookConfigFactory({ db: sourceRegionDb })({
id: cryptoRandomString({ length: 9 }),
streamId: testProject.id,
url: 'https://example.org',
description: cryptoRandomString({ length: 9 }),
secret: cryptoRandomString({ length: 9 }),
enabled: false,
triggers: ['branch_create']
})
await createWebhookEventFactory({ db: sourceRegionDb })({
id: cryptoRandomString({ length: 9 }),
webhookId: testWebhookId,
payload: cryptoRandomString({ length: 9 })
})
})
it('moves project record to target regional db', async () => {
@@ -641,5 +666,28 @@ isMultiRegionTestMode()
expect(comment).to.not.be.undefined
})
it('moves project webhooks to target regional db', async () => {
const res = await apollo.execute(UpdateProjectRegionDocument, {
projectId: testProject.id,
regionKey: regionKey2
})
expect(res).to.not.haveGraphQLErrors()
const webhook = await tables
.webhooks(targetRegionDb)
.select('*')
.where({ id: testWebhookId })
.first()
expect(webhook).to.not.be.undefined
const webhookEvent = await tables
.webhookEvents(targetRegionDb)
.select('*')
.where({ webhookId: testWebhookId })
.first()
expect(webhookEvent).to.not.be.undefined
})
})
: void 0