From 04edd9c6af29aa3fd8dddf7ff751ccd8c744b52c Mon Sep 17 00:00:00 2001 From: Chuck Driesler Date: Thu, 6 Feb 2025 16:59:44 +0000 Subject: [PATCH] fix(regions): speed up inserts --- .../modules/workspaces/domain/operations.ts | 4 +- .../workspaces/graph/resolvers/regions.ts | 8 + .../workspaces/repositories/projectRegions.ts | 192 +++++++++--------- .../workspaces/services/projectRegions.ts | 9 +- .../tests/integration/projects.graph.spec.ts | 5 + packages/shared/src/environment/index.ts | 6 + 6 files changed, 124 insertions(+), 100 deletions(-) diff --git a/packages/server/modules/workspaces/domain/operations.ts b/packages/server/modules/workspaces/domain/operations.ts index 83ee87e85..66f0983c3 100644 --- a/packages/server/modules/workspaces/domain/operations.ts +++ b/packages/server/modules/workspaces/domain/operations.ts @@ -359,7 +359,7 @@ export type CopyWorkspace = (params: { workspaceId: string }) => Promise export type CopyProjects = (params: { projectIds: string[] }) => Promise export type CopyProjectModels = (params: { projectIds: string[] -}) => Promise> +}) => Promise> export type CopyProjectVersions = (params: { projectIds: string[] -}) => Promise> +}) => Promise> diff --git a/packages/server/modules/workspaces/graph/resolvers/regions.ts b/packages/server/modules/workspaces/graph/resolvers/regions.ts index 73770ec14..bb01c8195 100644 --- a/packages/server/modules/workspaces/graph/resolvers/regions.ts +++ b/packages/server/modules/workspaces/graph/resolvers/regions.ts @@ -29,6 +29,10 @@ import { getProjectFactory } from '@/modules/core/repositories/projects' import { getStreamBranchCountFactory } from '@/modules/core/repositories/branches' import { getStreamCommitCountFactory } from '@/modules/core/repositories/commits' import { withTransaction } from '@/modules/shared/helpers/dbHelper' +import { getFeatureFlags, isTestEnv } from '@/modules/shared/helpers/envHelper' +import { WorkspacesNotYetImplementedError } from '@/modules/workspaces/errors/workspace' + +const { FF_MOVE_PROJECT_REGION_ENABLED } = getFeatureFlags() export default { Workspace: { @@ -67,6 +71,10 @@ export default { }, WorkspaceProjectMutations: { moveToRegion: async (_parent, args, context) => { + if (!FF_MOVE_PROJECT_REGION_ENABLED && !isTestEnv()) { + throw new WorkspacesNotYetImplementedError() + } + await authorizeResolver( context.userId, args.projectId, diff --git a/packages/server/modules/workspaces/repositories/projectRegions.ts b/packages/server/modules/workspaces/repositories/projectRegions.ts index 94da98d14..4c7e5eee7 100644 --- a/packages/server/modules/workspaces/repositories/projectRegions.ts +++ b/packages/server/modules/workspaces/repositories/projectRegions.ts @@ -12,8 +12,10 @@ import { Commit } from '@/modules/core/domain/commits/types' import { Stream } from '@/modules/core/domain/streams/types' import { BranchCommitRecord, + CommitRecord, StreamCommitRecord, - StreamFavoriteRecord + StreamFavoriteRecord, + StreamRecord } from '@/modules/core/helpers/types' import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper' import { @@ -50,7 +52,11 @@ export const copyWorkspaceFactory = throw new WorkspaceNotFoundError() } - await tables.workspaces(deps.targetDb).insert(workspace) + await tables + .workspaces(deps.targetDb) + .insert(workspace) + .onConflict(Workspaces.withoutTablePrefix.col.id) + .merge(Workspaces.withoutTablePrefix.cols as (keyof Workspace)[]) return workspaceId } @@ -66,15 +72,15 @@ export const copyProjectsFactory = // Copy project record for await (const projects of executeBatchedSelect(selectProjects)) { - for (const project of projects) { - // Store copied project id - copiedProjectIds.push(project.id) - - // Copy `streams` row to target db - await tables.projects(deps.targetDb).insert(project).onConflict().ignore() - } - const projectIds = projects.map((project) => project.id) + copiedProjectIds.push(...projectIds) + + // Copy `streams` rows to target db + await tables + .projects(deps.targetDb) + .insert(projects) + .onConflict(Streams.withoutTablePrefix.col.id) + .merge(Streams.withoutTablePrefix.cols as (keyof StreamRecord)[]) // Fetch `stream_favorites` rows for projects in batch const selectStreamFavorites = tables @@ -83,14 +89,12 @@ export const copyProjectsFactory = .whereIn(StreamFavorites.col.streamId, projectIds) for await (const streamFavorites of executeBatchedSelect(selectStreamFavorites)) { - for (const streamFavorite of streamFavorites) { - // Copy `stream_favorites` row to target db - await tables - .streamFavorites(deps.targetDb) - .insert(streamFavorite) - .onConflict() - .ignore() - } + // Copy `stream_favorites` rows to target db + await tables + .streamFavorites(deps.targetDb) + .insert(streamFavorites) + .onConflict() + .ignore() } // Fetch `streams_meta` rows for projects in batch @@ -102,14 +106,12 @@ export const copyProjectsFactory = for await (const streamsMetadataBatch of executeBatchedSelect( selectStreamsMetadata )) { - for (const streamMetadata of streamsMetadataBatch) { - // Copy `streams_meta` row to target db - await tables - .streamsMeta(deps.targetDb) - .insert(streamMetadata) - .onConflict() - .ignore() - } + // Copy `streams_meta` rows to target db + await tables + .streamsMeta(deps.targetDb) + .insert(streamsMetadataBatch) + .onConflict() + .ignore() } } @@ -119,98 +121,98 @@ export const copyProjectsFactory = export const copyProjectModelsFactory = (deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectModels => async ({ projectIds }) => { - const copiedModelIds: Record = projectIds.reduce( - (result, id) => ({ ...result, [id]: [] }), - {} - ) + const copiedModelCountByProjectId: Record = {} - for (const projectId of projectIds) { - const selectModels = tables - .models(deps.sourceDb) - .select('*') - .where({ streamId: projectId }) + // Fetch `branches` rows for projects in batch + const selectModels = tables + .models(deps.sourceDb) + .select('*') + .whereIn(Branches.col.streamId, projectIds) - for await (const models of executeBatchedSelect(selectModels)) { - for (const model of models) { - // Store copied model ids - copiedModelIds[projectId].push(model.id) + for await (const models of executeBatchedSelect(selectModels)) { + // Copy `branches` rows to target db + await tables.models(deps.targetDb).insert(models).onConflict().ignore() - // Copy `branches` row to target db - await tables.models(deps.targetDb).insert(model).onConflict().ignore() - } + for (const model of models) { + copiedModelCountByProjectId[model.streamId] ??= 0 + copiedModelCountByProjectId[model.streamId]++ } } - return copiedModelIds + return copiedModelCountByProjectId } export const copyProjectVersionsFactory = (deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectVersions => async ({ projectIds }) => { - const copiedVersionIds: Record = projectIds.reduce( - (result, id) => ({ ...result, [id]: [] }), - {} - ) + const copiedVersionCountByProjectId: Record = {} - for (const projectId of projectIds) { - const selectVersions = tables - .streamCommits(deps.sourceDb) - .select('*') - .join( - Commits.name, - Commits.col.id, - StreamCommits.col.commitId - ) - .where({ streamId: projectId }) + const selectVersions = tables + .streamCommits(deps.sourceDb) + .select('*') + .join( + Commits.name, + Commits.col.id, + StreamCommits.col.commitId + ) + .whereIn(StreamCommits.col.streamId, projectIds) - for await (const versions of executeBatchedSelect(selectVersions)) { - for (const version of versions) { + for await (const versions of executeBatchedSelect(selectVersions)) { + const { commitIds, commits } = versions.reduce( + (all, version) => { const { commitId, streamId, ...commit } = version - // Store copied version id - copiedVersionIds[streamId].push(commitId) + all.commitIds.push(commitId) + all.streamIds.push(streamId) + all.commits.push(commit) - // Copy `commits` row to target db - await tables.versions(deps.targetDb).insert(commit).onConflict().ignore() + return all + }, + { commitIds: [], streamIds: [], commits: [] } as { + commitIds: string[] + streamIds: string[] + commits: CommitRecord[] } + ) - const commitIds = versions.map((version) => version.commitId) + // Copy `commits` rows to target db + await tables.versions(deps.targetDb).insert(commits).onConflict().ignore() - // Fetch `branch_commits` rows for versions in batch - const selectBranchCommits = tables - .branchCommits(deps.sourceDb) - .select('*') - .whereIn(BranchCommits.col.commitId, commitIds) + for (const version of versions) { + copiedVersionCountByProjectId[version.streamId] ??= 0 + copiedVersionCountByProjectId[version.streamId]++ + } - for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) { - for (const branchCommit of branchCommits) { - // Copy `branch_commits` row to target db - await tables - .branchCommits(deps.targetDb) - .insert(branchCommit) - .onConflict() - .ignore() - } - } + // Fetch `branch_commits` rows for versions in batch + const selectBranchCommits = tables + .branchCommits(deps.sourceDb) + .select('*') + .whereIn(BranchCommits.col.commitId, commitIds) - // Fetch `stream_commits` rows for versions in batch - const selectStreamCommits = tables - .streamCommits(deps.sourceDb) - .select('*') - .whereIn(StreamCommits.col.commitId, commitIds) + for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) { + // Copy `branch_commits` row to target db + await tables + .branchCommits(deps.targetDb) + .insert(branchCommits) + .onConflict() + .ignore() + } - for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) { - for (const streamCommit of streamCommits) { - // Copy `stream_commits` row to target db - await tables - .streamCommits(deps.targetDb) - .insert(streamCommit) - .onConflict() - .ignore() - } - } + // Fetch `stream_commits` rows for versions in batch + const selectStreamCommits = tables + .streamCommits(deps.sourceDb) + .select('*') + .whereIn(StreamCommits.col.commitId, commitIds) + + for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) { + // Copy `stream_commits` row to target db + await tables + .streamCommits(deps.targetDb) + .insert(streamCommits) + .onConflict() + .ignore() } } - return copiedVersionIds + return copiedVersionCountByProjectId } diff --git a/packages/server/modules/workspaces/services/projectRegions.ts b/packages/server/modules/workspaces/services/projectRegions.ts index 561aee792..2bc60cd66 100644 --- a/packages/server/modules/workspaces/services/projectRegions.ts +++ b/packages/server/modules/workspaces/services/projectRegions.ts @@ -70,9 +70,12 @@ export const updateProjectRegionFactory = const sourceProjectModelCount = await deps.countProjectModels(projectId) const sourceProjectVersionCount = await deps.countProjectVersions(projectId) - const isReconciled = - modelIds[projectId].length === sourceProjectModelCount && - versionIds[projectId].length === sourceProjectVersionCount + const tests = [ + modelIds[projectId] === sourceProjectModelCount, + versionIds[projectId] === sourceProjectVersionCount + ] + + const isReconciled = tests.every((test) => !!test) if (!isReconciled) { // TODO: Move failed or source project added data while changing regions. Retry move. diff --git a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts index 9b33dc0ad..985e1c6ee 100644 --- a/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts +++ b/packages/server/modules/workspaces/tests/integration/projects.graph.spec.ts @@ -382,6 +382,7 @@ isMultiRegionTestMode() expect(res).to.not.haveGraphQLErrors() + // TODO: Replace with gql query when possible const project = await targetRegionDb .table('streams') .select('*') @@ -399,6 +400,7 @@ isMultiRegionTestMode() expect(res).to.not.haveGraphQLErrors() + // TODO: Replace with gql query when possible const branch = await targetRegionDb .table('branches') .select('*') @@ -416,6 +418,7 @@ isMultiRegionTestMode() expect(res).to.not.haveGraphQLErrors() + // TODO: Replace with gql query when possible const version = await targetRegionDb .table('commits') .select('*') @@ -423,6 +426,7 @@ isMultiRegionTestMode() .first() expect(version).to.not.be.undefined + // TODO: Replace with gql query when possible const streamCommitsRecord = await targetRegionDb .table('stream_commits') .select('*') @@ -430,6 +434,7 @@ isMultiRegionTestMode() .first() expect(streamCommitsRecord).to.not.be.undefined + // TODO: Replace with gql query when possible const branchCommitsRecord = await targetRegionDb .table('branch_commits') .select('*') diff --git a/packages/shared/src/environment/index.ts b/packages/shared/src/environment/index.ts index 0d35eacdb..d81d9fc27 100644 --- a/packages/shared/src/environment/index.ts +++ b/packages/shared/src/environment/index.ts @@ -70,6 +70,11 @@ const parseFeatureFlags = () => { FF_OBJECTS_STREAMING_FIX: { schema: z.boolean(), defaults: { production: false, _: false } + }, + // Enables endpoint(s) for updating a project's region + FF_MOVE_PROJECT_REGION_ENABLED: { + schema: z.boolean(), + defaults: { production: false, _: true } } }) @@ -98,6 +103,7 @@ export function getFeatureFlags(): { FF_FORCE_EMAIL_VERIFICATION: boolean FF_FORCE_ONBOARDING: boolean FF_OBJECTS_STREAMING_FIX: boolean + FF_MOVE_PROJECT_REGION_ENABLED: boolean } { if (!parsedFlags) parsedFlags = parseFeatureFlags() return parsedFlags