diff --git a/packages/server/modules/core/domain/commits/operations.ts b/packages/server/modules/core/domain/commits/operations.ts index b815039be..549ab4f63 100644 --- a/packages/server/modules/core/domain/commits/operations.ts +++ b/packages/server/modules/core/domain/commits/operations.ts @@ -9,6 +9,7 @@ import { UpdateVersionInput } from '@/modules/core/graph/generated/graphql' import { BranchCommitRecord, StreamCommitRecord } from '@/modules/core/helpers/types' +import { BatchedSelectOptions } from '@/modules/shared/helpers/dbHelper' import { MaybeNullOrUndefined, Nullable, Optional } from '@speckle/shared' import { Knex } from 'knex' @@ -153,3 +154,20 @@ export type GetUserAuthoredCommitCounts = (params: { export type GetCommitsAndTheirBranchIds = ( commitIds: string[] ) => Promise + +export type GetBatchedStreamCommits = ( + streamId: string, + options?: Partial +) => AsyncGenerator + +export type GetBatchedBranchCommits = ( + branchIds: string[], + options?: Partial +) => AsyncGenerator + +export type InsertCommits = ( + commits: Commit[], + options?: Partial<{ + trx: Knex.Transaction + }> +) => Promise diff --git a/packages/server/modules/core/repositories/commits.ts b/packages/server/modules/core/repositories/commits.ts index 0b5afdb16..edae68a5b 100644 --- a/packages/server/modules/core/repositories/commits.ts +++ b/packages/server/modules/core/repositories/commits.ts @@ -41,7 +41,10 @@ import { GetStreamCommitCount, GetUserStreamCommitCounts, GetUserAuthoredCommitCounts, - GetCommitsAndTheirBranchIds + GetCommitsAndTheirBranchIds, + GetBatchedStreamCommits, + GetBatchedBranchCommits, + InsertCommits } from '@/modules/core/domain/commits/operations' const tables = { @@ -133,38 +136,38 @@ export const deleteCommitFactory = return !!delCount } -export function getBatchedStreamCommits( - streamId: string, - options?: Partial -) { - const baseQuery = Commits.knex() - .select(Commits.cols) - .innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id) - .where(StreamCommits.col.streamId, streamId) - .orderBy(Commits.col.id) +export const getBatchedStreamCommitsFactory = + (deps: { db: Knex }): GetBatchedStreamCommits => + (streamId: string, options?: Partial) => { + const baseQuery = tables + .commits(deps.db) + .select(Commits.cols) + .innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id) + .where(StreamCommits.col.streamId, streamId) + .orderBy(Commits.col.id) - return executeBatchedSelect(baseQuery, options) -} + return executeBatchedSelect(baseQuery, options) + } -export function getBatchedBranchCommits( - branchIds: string[], - options?: Partial -) { - const baseQuery = BranchCommits.knex() - .whereIn(BranchCommits.col.branchId, branchIds) - .orderBy(BranchCommits.col.branchId) +export const getBatchedBranchCommitsFactory = + (deps: { db: Knex }): GetBatchedBranchCommits => + (branchIds: string[], options?: Partial) => { + const baseQuery = tables + .branchCommits(deps.db) + .select('*') + .whereIn(BranchCommits.col.branchId, branchIds) + .orderBy(BranchCommits.col.branchId) - return executeBatchedSelect(baseQuery, options) -} + return executeBatchedSelect(baseQuery, options) + } -export async function insertCommits( - commits: CommitRecord[], - options?: Partial<{ trx: Knex.Transaction }> -) { - const q = Commits.knex().insert(commits) - if (options?.trx) q.transacting(options.trx) - return await q -} +export const insertCommitsFactory = + (deps: { db: Knex }): InsertCommits => + async (commits: CommitRecord[], options?: Partial<{ trx: Knex.Transaction }>) => { + const q = tables.commits(deps.db).insert(commits) + if (options?.trx) q.transacting(options.trx) + return await q + } export const insertStreamCommitsFactory = (deps: { db: Knex }): InsertStreamCommits => diff --git a/packages/server/modules/core/services/streams/clone.ts b/packages/server/modules/core/services/streams/clone.ts index c17352ab6..72bf21301 100644 --- a/packages/server/modules/core/services/streams/clone.ts +++ b/packages/server/modules/core/services/streams/clone.ts @@ -15,12 +15,12 @@ import { insertObjects } from '@/modules/core/repositories/objects' import { - getBatchedStreamCommits, generateCommitId, - insertCommits, - getBatchedBranchCommits, insertStreamCommitsFactory, - insertBranchCommitsFactory + insertBranchCommitsFactory, + getBatchedStreamCommitsFactory, + getBatchedBranchCommitsFactory, + insertCommitsFactory } from '@/modules/core/repositories/commits' import { chunk } from 'lodash' import { @@ -165,6 +165,8 @@ async function cloneCommits(state: CloneStreamInitialState) { // oldCommitId/newCommitId const commitIdMap = new Map() + const insertCommits = insertCommitsFactory({ db }) + const getBatchedStreamCommits = getBatchedStreamCommitsFactory({ db }) for await (const commitsBatch of getBatchedStreamCommits(state.targetStream.id, { trx: state.trx })) { @@ -237,6 +239,8 @@ async function createBranchCommitReferences( branchIdMap: Map ) { const oldBranchIds = [...branchIdMap.keys()] + const getBatchedBranchCommits = getBatchedBranchCommitsFactory({ db }) + for await (const branchCommits of getBatchedBranchCommits(oldBranchIds, { trx: state.trx })) {