From 0b0a3dca8703d6f72ca25cbacb2aa754a325f8eb Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Thu, 17 Oct 2024 16:43:38 +0300 Subject: [PATCH] chore(server): core IoC #89 - createObjectsBatchedFactory --- .../modules/core/domain/objects/operations.ts | 10 ++ .../modules/core/repositories/objects.ts | 14 +++ packages/server/modules/core/rest/upload.ts | 20 +++- .../server/modules/core/services/objects.js | 87 +-------------- .../core/services/objects/management.ts | 101 ++++++++++++++++-- .../server/modules/core/utils/chunking.ts | 5 +- 6 files changed, 139 insertions(+), 98 deletions(-) diff --git a/packages/server/modules/core/domain/objects/operations.ts b/packages/server/modules/core/domain/objects/operations.ts index f45eb5f69..fa2b1a955 100644 --- a/packages/server/modules/core/domain/objects/operations.ts +++ b/packages/server/modules/core/domain/objects/operations.ts @@ -35,6 +35,10 @@ export type StoreSingleObjectIfNotFound = ( object: SpeckleObject | InsertableSpeckleObject ) => Promise +export type StoreObjectsIfNotFound = ( + objects: Array +) => Promise + export type StoreClosuresIfNotFound = ( closures: SpeckleObjectClosureEntry[] ) => Promise @@ -44,3 +48,9 @@ export type CreateObject = (params: { object: RawSpeckleObject logger?: Logger }) => Promise + +export type CreateObjectsBatched = (params: { + streamId: string + objects: RawSpeckleObject[] + logger?: Logger +}) => Promise diff --git a/packages/server/modules/core/repositories/objects.ts b/packages/server/modules/core/repositories/objects.ts index 76f1a8408..a252f8771 100644 --- a/packages/server/modules/core/repositories/objects.ts +++ b/packages/server/modules/core/repositories/objects.ts @@ -12,6 +12,7 @@ import { GetStreamObjects, StoreClosuresIfNotFound, StoreObjects, + StoreObjectsIfNotFound, StoreSingleObjectIfNotFound } from '@/modules/core/domain/objects/operations' import { SpeckleObject } from '@/modules/core/domain/objects/types' @@ -85,6 +86,19 @@ export const storeSingleObjectIfNotFoundFactory = .ignore() } +export const storeObjectsIfNotFoundFactory = + (deps: { db: Knex }): StoreObjectsIfNotFound => + async (batch) => { + await tables + .objects(deps.db) + .insert( + // knex is bothered by string being inserted into jsonb, which is actually fine + batch as SpeckleObject[] + ) + .onConflict() + .ignore() + } + export const storeClosuresIfNotFoundFactory = (deps: { db: Knex }): StoreClosuresIfNotFound => async (closuresBatch) => { diff --git a/packages/server/modules/core/rest/upload.ts b/packages/server/modules/core/rest/upload.ts index aa9582bd3..7b8a377d9 100644 --- a/packages/server/modules/core/rest/upload.ts +++ b/packages/server/modules/core/rest/upload.ts @@ -6,24 +6,34 @@ import { getFeatureFlags, maximumObjectUploadFileSizeMb } from '@/modules/shared/helpers/envHelper' -import { - createObjectsBatched, - createObjectsBatchedAndNoClosures -} from '@/modules/core/services/objects' +import { createObjectsBatchedAndNoClosures } from '@/modules/core/services/objects' import { ObjectHandlingError } from '@/modules/core/errors/object' import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking' import { toMegabytesWith1DecimalPlace } from '@/modules/core/utils/formatting' import { Logger } from 'pino' import { Router } from 'express' +import { createObjectsBatchedFactory } from '@/modules/core/services/objects/management' +import { + storeClosuresIfNotFoundFactory, + storeObjectsIfNotFoundFactory +} from '@/modules/core/repositories/objects' +import { db } from '@/db/knex' +import { RawSpeckleObject } from '@/modules/core/domain/objects/types' const MAX_FILE_SIZE = maximumObjectUploadFileSizeMb() * 1024 * 1024 const { FF_NO_CLOSURE_WRITES } = getFeatureFlags() +const createObjectsBatched = createObjectsBatchedFactory({ + storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }), + storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db }) +}) + let objectInsertionService: (params: { streamId: string - objects: unknown[] + objects: RawSpeckleObject[] logger?: Logger }) => Promise = createObjectsBatched + if (FF_NO_CLOSURE_WRITES) { objectInsertionService = createObjectsBatchedAndNoClosures } diff --git a/packages/server/modules/core/services/objects.js b/packages/server/modules/core/services/objects.js index 38257e18e..666546d23 100644 --- a/packages/server/modules/core/services/objects.js +++ b/packages/server/modules/core/services/objects.js @@ -1,84 +1,18 @@ const { performance } = require('perf_hooks') -const { set, get, chunk } = require('lodash') +const { set, get } = require('lodash') const knex = require(`@/db/knex`) const { servicesLogger } = require('@/logging/logging') const { chunkInsertionObjectArray } = require('@/modules/core/utils/chunking') -const { prepInsertionObject } = require('@/modules/core/services/objects/management') +const { + prepInsertionObject, + prepInsertionObjectBatch +} = require('@/modules/core/services/objects/management') const Objects = () => knex('objects') const Closures = () => knex('object_children_closure') module.exports = { - async createObjectsBatched({ streamId, objects, logger = servicesLogger }) { - const closures = [] - const objsToInsert = [] - const ids = [] - - // Prep objects up - objects.forEach((obj) => { - const insertionObject = prepInsertionObject(streamId, obj) - let totalChildrenCountGlobal = 0 - const totalChildrenCountByDepth = {} - - if (obj.__closure !== null) { - for (const prop in obj.__closure) { - closures.push({ - streamId, - parent: insertionObject.id, - child: prop, - minDepth: obj.__closure[prop] - }) - totalChildrenCountGlobal++ - if (totalChildrenCountByDepth[obj.__closure[prop].toString()]) - totalChildrenCountByDepth[obj.__closure[prop].toString()]++ - else totalChildrenCountByDepth[obj.__closure[prop].toString()] = 1 - } - } - - insertionObject.totalChildrenCount = totalChildrenCountGlobal - insertionObject.totalChildrenCountByDepth = JSON.stringify( - totalChildrenCountByDepth - ) - - delete insertionObject.__tree - delete insertionObject.__closure - - objsToInsert.push(insertionObject) - ids.push(insertionObject.id) - }) - - const closureBatchSize = 1000 - const objectsBatchSize = 500 - - // step 1: insert objects - if (objsToInsert.length > 0) { - // const batches = chunk(objsToInsert, objectsBatchSize) - const batches = chunkInsertionObjectArray({ - objects: objsToInsert, - chunkLengthLimit: objectsBatchSize, - chunkSizeLimitMb: 2 - }) - for (const batch of batches) { - prepInsertionObjectBatch(batch) - await Objects().insert(batch).onConflict().ignore() - logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects') - } - } - - // step 2: insert closures - if (closures.length > 0) { - const batches = chunk(closures, closureBatchSize) - - for (const batch of batches) { - prepInsertionClosureBatch(batch) - await Closures().insert(batch).onConflict().ignore() - logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures') - } - } - return true - }, - async createObjectsBatchedAndNoClosures({ streamId, objects, @@ -629,14 +563,3 @@ module.exports = { throw new Error('Updating object is not implemented') } } - -// Batches need to be inserted ordered by id to avoid deadlocks -function prepInsertionObjectBatch(batch) { - batch.sort((a, b) => (a.id > b.id ? 1 : -1)) -} - -function prepInsertionClosureBatch(batch) { - batch.sort((a, b) => - a.parent > b.parent ? 1 : a.parent === b.parent ? (a.child > b.child ? 1 : -1) : -1 - ) -} diff --git a/packages/server/modules/core/services/objects/management.ts b/packages/server/modules/core/services/objects/management.ts index 6f5428553..a84d67df7 100644 --- a/packages/server/modules/core/services/objects/management.ts +++ b/packages/server/modules/core/services/objects/management.ts @@ -1,17 +1,24 @@ import crypto from 'crypto' import { InsertableSpeckleObject, - RawSpeckleObject + RawSpeckleObject, + SpeckleObjectClosureEntry } from '@/modules/core/domain/objects/types' import { getMaximumObjectSizeMB } from '@/modules/shared/helpers/envHelper' -import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking' +import { + chunkInsertionObjectArray, + estimateStringMegabyteSize +} from '@/modules/core/utils/chunking' import { ObjectHandlingError } from '@/modules/core/errors/object' import { servicesLogger } from '@/logging/logging' import { CreateObject, + CreateObjectsBatched, StoreClosuresIfNotFound, + StoreObjectsIfNotFound, StoreSingleObjectIfNotFound } from '@/modules/core/domain/objects/operations' +import { chunk } from 'lodash' /** * Note: we're generating the hash here, rather than on the db side, as there are @@ -58,12 +65,7 @@ export const createObjectFactory = async ({ streamId, object, logger = servicesLogger }) => { const insertionObject = prepInsertionObject(streamId, object) - const closures: Array<{ - streamId: string - parent: string - child: string - minDepth: number - }> = [] + const closures: Array = [] const totalChildrenCountByDepth: Record = {} if (object.__closure !== null) { @@ -101,3 +103,86 @@ export const createObjectFactory = return insertionObject.id } + +// Batches need to be inserted ordered by id to avoid deadlocks +export const prepInsertionObjectBatch = (batch: InsertableSpeckleObject[]) => { + batch.sort((a, b) => (a.id > b.id ? 1 : -1)) +} + +export const prepInsertionClosureBatch = (batch: SpeckleObjectClosureEntry[]) => { + batch.sort((a, b) => + a.parent > b.parent ? 1 : a.parent === b.parent ? (a.child > b.child ? 1 : -1) : -1 + ) +} + +export const createObjectsBatchedFactory = + (deps: { + storeObjectsIfNotFoundFactory: StoreObjectsIfNotFound + storeClosuresIfNotFound: StoreClosuresIfNotFound + }): CreateObjectsBatched => + async ({ streamId, objects, logger = servicesLogger }) => { + const closures: SpeckleObjectClosureEntry[] = [] + const objsToInsert: InsertableSpeckleObject[] = [] + const ids: string[] = [] + + // Prep objects up + objects.forEach((obj) => { + const insertionObject = prepInsertionObject(streamId, obj) + let totalChildrenCountGlobal = 0 + const totalChildrenCountByDepth: Record = {} + + if (obj.__closure !== null) { + for (const prop in obj.__closure) { + closures.push({ + streamId, + parent: insertionObject.id, + child: prop, + minDepth: obj.__closure[prop] + }) + totalChildrenCountGlobal++ + if (totalChildrenCountByDepth[obj.__closure[prop].toString()]) + totalChildrenCountByDepth[obj.__closure[prop].toString()]++ + else totalChildrenCountByDepth[obj.__closure[prop].toString()] = 1 + } + } + + const finalInsertionObject: InsertableSpeckleObject = { + ...insertionObject, + totalChildrenCount: totalChildrenCountGlobal, + totalChildrenCountByDepth: JSON.stringify(totalChildrenCountByDepth) + } + + objsToInsert.push(finalInsertionObject) + ids.push(insertionObject.id) + }) + + const closureBatchSize = 1000 + const objectsBatchSize = 500 + + // step 1: insert objects + if (objsToInsert.length > 0) { + // const batches = chunk(objsToInsert, objectsBatchSize) + const batches = chunkInsertionObjectArray({ + objects: objsToInsert, + chunkLengthLimit: objectsBatchSize, + chunkSizeLimitMb: 2 + }) + for (const batch of batches) { + prepInsertionObjectBatch(batch) + await deps.storeObjectsIfNotFoundFactory(batch) + logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects') + } + } + + // step 2: insert closures + if (closures.length > 0) { + const batches = chunk(closures, closureBatchSize) + + for (const batch of batches) { + prepInsertionClosureBatch(batch) + await deps.storeClosuresIfNotFound(batch) + logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures') + } + } + return true + } diff --git a/packages/server/modules/core/utils/chunking.ts b/packages/server/modules/core/utils/chunking.ts index 40b34c833..71f3572fe 100644 --- a/packages/server/modules/core/utils/chunking.ts +++ b/packages/server/modules/core/utils/chunking.ts @@ -1,9 +1,8 @@ +import { InsertableSpeckleObject } from '@/modules/core/domain/objects/types' import { BaseError } from '@/modules/shared/errors' import { Options } from 'verror' -type InsertionObject = { - data: string -} +type InsertionObject = InsertableSpeckleObject export class ArgumentError extends BaseError { static defaultMessage = 'Invalid argument value provided'