fix(regions): speed up inserts

This commit is contained in:
Chuck Driesler
2025-02-06 16:59:44 +00:00
parent b48721e85a
commit 04edd9c6af
6 changed files with 124 additions and 100 deletions
@@ -359,7 +359,7 @@ export type CopyWorkspace = (params: { workspaceId: string }) => Promise<string>
export type CopyProjects = (params: { projectIds: string[] }) => Promise<string[]>
export type CopyProjectModels = (params: {
projectIds: string[]
}) => Promise<Record<string, string[]>>
}) => Promise<Record<string, number>>
export type CopyProjectVersions = (params: {
projectIds: string[]
}) => Promise<Record<string, string[]>>
}) => Promise<Record<string, number>>
@@ -29,6 +29,10 @@ import { getProjectFactory } from '@/modules/core/repositories/projects'
import { getStreamBranchCountFactory } from '@/modules/core/repositories/branches'
import { getStreamCommitCountFactory } from '@/modules/core/repositories/commits'
import { withTransaction } from '@/modules/shared/helpers/dbHelper'
import { getFeatureFlags, isTestEnv } from '@/modules/shared/helpers/envHelper'
import { WorkspacesNotYetImplementedError } from '@/modules/workspaces/errors/workspace'
const { FF_MOVE_PROJECT_REGION_ENABLED } = getFeatureFlags()
export default {
Workspace: {
@@ -67,6 +71,10 @@ export default {
},
WorkspaceProjectMutations: {
moveToRegion: async (_parent, args, context) => {
if (!FF_MOVE_PROJECT_REGION_ENABLED && !isTestEnv()) {
throw new WorkspacesNotYetImplementedError()
}
await authorizeResolver(
context.userId,
args.projectId,
@@ -12,8 +12,10 @@ import { Commit } from '@/modules/core/domain/commits/types'
import { Stream } from '@/modules/core/domain/streams/types'
import {
BranchCommitRecord,
CommitRecord,
StreamCommitRecord,
StreamFavoriteRecord
StreamFavoriteRecord,
StreamRecord
} from '@/modules/core/helpers/types'
import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper'
import {
@@ -50,7 +52,11 @@ export const copyWorkspaceFactory =
throw new WorkspaceNotFoundError()
}
await tables.workspaces(deps.targetDb).insert(workspace)
await tables
.workspaces(deps.targetDb)
.insert(workspace)
.onConflict(Workspaces.withoutTablePrefix.col.id)
.merge(Workspaces.withoutTablePrefix.cols as (keyof Workspace)[])
return workspaceId
}
@@ -66,15 +72,15 @@ export const copyProjectsFactory =
// Copy project record
for await (const projects of executeBatchedSelect(selectProjects)) {
for (const project of projects) {
// Store copied project id
copiedProjectIds.push(project.id)
// Copy `streams` row to target db
await tables.projects(deps.targetDb).insert(project).onConflict().ignore()
}
const projectIds = projects.map((project) => project.id)
copiedProjectIds.push(...projectIds)
// Copy `streams` rows to target db
await tables
.projects(deps.targetDb)
.insert(projects)
.onConflict(Streams.withoutTablePrefix.col.id)
.merge(Streams.withoutTablePrefix.cols as (keyof StreamRecord)[])
// Fetch `stream_favorites` rows for projects in batch
const selectStreamFavorites = tables
@@ -83,14 +89,12 @@ export const copyProjectsFactory =
.whereIn(StreamFavorites.col.streamId, projectIds)
for await (const streamFavorites of executeBatchedSelect(selectStreamFavorites)) {
for (const streamFavorite of streamFavorites) {
// Copy `stream_favorites` row to target db
await tables
.streamFavorites(deps.targetDb)
.insert(streamFavorite)
.onConflict()
.ignore()
}
// Copy `stream_favorites` rows to target db
await tables
.streamFavorites(deps.targetDb)
.insert(streamFavorites)
.onConflict()
.ignore()
}
// Fetch `streams_meta` rows for projects in batch
@@ -102,14 +106,12 @@ export const copyProjectsFactory =
for await (const streamsMetadataBatch of executeBatchedSelect(
selectStreamsMetadata
)) {
for (const streamMetadata of streamsMetadataBatch) {
// Copy `streams_meta` row to target db
await tables
.streamsMeta(deps.targetDb)
.insert(streamMetadata)
.onConflict()
.ignore()
}
// Copy `streams_meta` rows to target db
await tables
.streamsMeta(deps.targetDb)
.insert(streamsMetadataBatch)
.onConflict()
.ignore()
}
}
@@ -119,98 +121,98 @@ export const copyProjectsFactory =
export const copyProjectModelsFactory =
(deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectModels =>
async ({ projectIds }) => {
const copiedModelIds: Record<string, string[]> = projectIds.reduce(
(result, id) => ({ ...result, [id]: [] }),
{}
)
const copiedModelCountByProjectId: Record<string, number> = {}
for (const projectId of projectIds) {
const selectModels = tables
.models(deps.sourceDb)
.select('*')
.where({ streamId: projectId })
// Fetch `branches` rows for projects in batch
const selectModels = tables
.models(deps.sourceDb)
.select('*')
.whereIn(Branches.col.streamId, projectIds)
for await (const models of executeBatchedSelect(selectModels)) {
for (const model of models) {
// Store copied model ids
copiedModelIds[projectId].push(model.id)
for await (const models of executeBatchedSelect(selectModels)) {
// Copy `branches` rows to target db
await tables.models(deps.targetDb).insert(models).onConflict().ignore()
// Copy `branches` row to target db
await tables.models(deps.targetDb).insert(model).onConflict().ignore()
}
for (const model of models) {
copiedModelCountByProjectId[model.streamId] ??= 0
copiedModelCountByProjectId[model.streamId]++
}
}
return copiedModelIds
return copiedModelCountByProjectId
}
export const copyProjectVersionsFactory =
(deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectVersions =>
async ({ projectIds }) => {
const copiedVersionIds: Record<string, string[]> = projectIds.reduce(
(result, id) => ({ ...result, [id]: [] }),
{}
)
const copiedVersionCountByProjectId: Record<string, number> = {}
for (const projectId of projectIds) {
const selectVersions = tables
.streamCommits(deps.sourceDb)
.select('*')
.join<StreamCommitRecord & Commit>(
Commits.name,
Commits.col.id,
StreamCommits.col.commitId
)
.where({ streamId: projectId })
const selectVersions = tables
.streamCommits(deps.sourceDb)
.select('*')
.join<StreamCommitRecord & Commit>(
Commits.name,
Commits.col.id,
StreamCommits.col.commitId
)
.whereIn(StreamCommits.col.streamId, projectIds)
for await (const versions of executeBatchedSelect(selectVersions)) {
for (const version of versions) {
for await (const versions of executeBatchedSelect(selectVersions)) {
const { commitIds, commits } = versions.reduce(
(all, version) => {
const { commitId, streamId, ...commit } = version
// Store copied version id
copiedVersionIds[streamId].push(commitId)
all.commitIds.push(commitId)
all.streamIds.push(streamId)
all.commits.push(commit)
// Copy `commits` row to target db
await tables.versions(deps.targetDb).insert(commit).onConflict().ignore()
return all
},
{ commitIds: [], streamIds: [], commits: [] } as {
commitIds: string[]
streamIds: string[]
commits: CommitRecord[]
}
)
const commitIds = versions.map((version) => version.commitId)
// Copy `commits` rows to target db
await tables.versions(deps.targetDb).insert(commits).onConflict().ignore()
// Fetch `branch_commits` rows for versions in batch
const selectBranchCommits = tables
.branchCommits(deps.sourceDb)
.select('*')
.whereIn(BranchCommits.col.commitId, commitIds)
for (const version of versions) {
copiedVersionCountByProjectId[version.streamId] ??= 0
copiedVersionCountByProjectId[version.streamId]++
}
for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) {
for (const branchCommit of branchCommits) {
// Copy `branch_commits` row to target db
await tables
.branchCommits(deps.targetDb)
.insert(branchCommit)
.onConflict()
.ignore()
}
}
// Fetch `branch_commits` rows for versions in batch
const selectBranchCommits = tables
.branchCommits(deps.sourceDb)
.select('*')
.whereIn(BranchCommits.col.commitId, commitIds)
// Fetch `stream_commits` rows for versions in batch
const selectStreamCommits = tables
.streamCommits(deps.sourceDb)
.select('*')
.whereIn(StreamCommits.col.commitId, commitIds)
for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) {
// Copy `branch_commits` row to target db
await tables
.branchCommits(deps.targetDb)
.insert(branchCommits)
.onConflict()
.ignore()
}
for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) {
for (const streamCommit of streamCommits) {
// Copy `stream_commits` row to target db
await tables
.streamCommits(deps.targetDb)
.insert(streamCommit)
.onConflict()
.ignore()
}
}
// Fetch `stream_commits` rows for versions in batch
const selectStreamCommits = tables
.streamCommits(deps.sourceDb)
.select('*')
.whereIn(StreamCommits.col.commitId, commitIds)
for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) {
// Copy `stream_commits` row to target db
await tables
.streamCommits(deps.targetDb)
.insert(streamCommits)
.onConflict()
.ignore()
}
}
return copiedVersionIds
return copiedVersionCountByProjectId
}
@@ -70,9 +70,12 @@ export const updateProjectRegionFactory =
const sourceProjectModelCount = await deps.countProjectModels(projectId)
const sourceProjectVersionCount = await deps.countProjectVersions(projectId)
const isReconciled =
modelIds[projectId].length === sourceProjectModelCount &&
versionIds[projectId].length === sourceProjectVersionCount
const tests = [
modelIds[projectId] === sourceProjectModelCount,
versionIds[projectId] === sourceProjectVersionCount
]
const isReconciled = tests.every((test) => !!test)
if (!isReconciled) {
// TODO: Move failed or source project added data while changing regions. Retry move.
@@ -382,6 +382,7 @@ isMultiRegionTestMode()
expect(res).to.not.haveGraphQLErrors()
// TODO: Replace with gql query when possible
const project = await targetRegionDb
.table<StreamRecord>('streams')
.select('*')
@@ -399,6 +400,7 @@ isMultiRegionTestMode()
expect(res).to.not.haveGraphQLErrors()
// TODO: Replace with gql query when possible
const branch = await targetRegionDb
.table<BranchRecord>('branches')
.select('*')
@@ -416,6 +418,7 @@ isMultiRegionTestMode()
expect(res).to.not.haveGraphQLErrors()
// TODO: Replace with gql query when possible
const version = await targetRegionDb
.table<CommitRecord>('commits')
.select('*')
@@ -423,6 +426,7 @@ isMultiRegionTestMode()
.first()
expect(version).to.not.be.undefined
// TODO: Replace with gql query when possible
const streamCommitsRecord = await targetRegionDb
.table<StreamCommitRecord>('stream_commits')
.select('*')
@@ -430,6 +434,7 @@ isMultiRegionTestMode()
.first()
expect(streamCommitsRecord).to.not.be.undefined
// TODO: Replace with gql query when possible
const branchCommitsRecord = await targetRegionDb
.table<BranchCommitRecord>('branch_commits')
.select('*')
+6
View File
@@ -70,6 +70,11 @@ const parseFeatureFlags = () => {
FF_OBJECTS_STREAMING_FIX: {
schema: z.boolean(),
defaults: { production: false, _: false }
},
// Enables endpoint(s) for updating a project's region
FF_MOVE_PROJECT_REGION_ENABLED: {
schema: z.boolean(),
defaults: { production: false, _: true }
}
})
@@ -98,6 +103,7 @@ export function getFeatureFlags(): {
FF_FORCE_EMAIL_VERIFICATION: boolean
FF_FORCE_ONBOARDING: boolean
FF_OBJECTS_STREAMING_FIX: boolean
FF_MOVE_PROJECT_REGION_ENABLED: boolean
} {
if (!parsedFlags) parsedFlags = parseFeatureFlags()
return parsedFlags