chore(server): core IoC #89 - createObjectsBatchedFactory

This commit is contained in:
Kristaps Fabians Geikins
2024-10-17 16:43:38 +03:00
parent 486a3857b0
commit 0b0a3dca87
6 changed files with 139 additions and 98 deletions
@@ -35,6 +35,10 @@ export type StoreSingleObjectIfNotFound = (
object: SpeckleObject | InsertableSpeckleObject
) => Promise<void>
export type StoreObjectsIfNotFound = (
objects: Array<SpeckleObject | InsertableSpeckleObject>
) => Promise<void>
export type StoreClosuresIfNotFound = (
closures: SpeckleObjectClosureEntry[]
) => Promise<void>
@@ -44,3 +48,9 @@ export type CreateObject = (params: {
object: RawSpeckleObject
logger?: Logger
}) => Promise<string>
export type CreateObjectsBatched = (params: {
streamId: string
objects: RawSpeckleObject[]
logger?: Logger
}) => Promise<boolean>
@@ -12,6 +12,7 @@ import {
GetStreamObjects,
StoreClosuresIfNotFound,
StoreObjects,
StoreObjectsIfNotFound,
StoreSingleObjectIfNotFound
} from '@/modules/core/domain/objects/operations'
import { SpeckleObject } from '@/modules/core/domain/objects/types'
@@ -85,6 +86,19 @@ export const storeSingleObjectIfNotFoundFactory =
.ignore()
}
export const storeObjectsIfNotFoundFactory =
(deps: { db: Knex }): StoreObjectsIfNotFound =>
async (batch) => {
await tables
.objects(deps.db)
.insert(
// knex is bothered by string being inserted into jsonb, which is actually fine
batch as SpeckleObject[]
)
.onConflict()
.ignore()
}
export const storeClosuresIfNotFoundFactory =
(deps: { db: Knex }): StoreClosuresIfNotFound =>
async (closuresBatch) => {
+15 -5
View File
@@ -6,24 +6,34 @@ import {
getFeatureFlags,
maximumObjectUploadFileSizeMb
} from '@/modules/shared/helpers/envHelper'
import {
createObjectsBatched,
createObjectsBatchedAndNoClosures
} from '@/modules/core/services/objects'
import { createObjectsBatchedAndNoClosures } from '@/modules/core/services/objects'
import { ObjectHandlingError } from '@/modules/core/errors/object'
import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking'
import { toMegabytesWith1DecimalPlace } from '@/modules/core/utils/formatting'
import { Logger } from 'pino'
import { Router } from 'express'
import { createObjectsBatchedFactory } from '@/modules/core/services/objects/management'
import {
storeClosuresIfNotFoundFactory,
storeObjectsIfNotFoundFactory
} from '@/modules/core/repositories/objects'
import { db } from '@/db/knex'
import { RawSpeckleObject } from '@/modules/core/domain/objects/types'
const MAX_FILE_SIZE = maximumObjectUploadFileSizeMb() * 1024 * 1024
const { FF_NO_CLOSURE_WRITES } = getFeatureFlags()
const createObjectsBatched = createObjectsBatchedFactory({
storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }),
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db })
})
let objectInsertionService: (params: {
streamId: string
objects: unknown[]
objects: RawSpeckleObject[]
logger?: Logger
}) => Promise<boolean | string[]> = createObjectsBatched
if (FF_NO_CLOSURE_WRITES) {
objectInsertionService = createObjectsBatchedAndNoClosures
}
@@ -1,84 +1,18 @@
const { performance } = require('perf_hooks')
const { set, get, chunk } = require('lodash')
const { set, get } = require('lodash')
const knex = require(`@/db/knex`)
const { servicesLogger } = require('@/logging/logging')
const { chunkInsertionObjectArray } = require('@/modules/core/utils/chunking')
const { prepInsertionObject } = require('@/modules/core/services/objects/management')
const {
prepInsertionObject,
prepInsertionObjectBatch
} = require('@/modules/core/services/objects/management')
const Objects = () => knex('objects')
const Closures = () => knex('object_children_closure')
module.exports = {
async createObjectsBatched({ streamId, objects, logger = servicesLogger }) {
const closures = []
const objsToInsert = []
const ids = []
// Prep objects up
objects.forEach((obj) => {
const insertionObject = prepInsertionObject(streamId, obj)
let totalChildrenCountGlobal = 0
const totalChildrenCountByDepth = {}
if (obj.__closure !== null) {
for (const prop in obj.__closure) {
closures.push({
streamId,
parent: insertionObject.id,
child: prop,
minDepth: obj.__closure[prop]
})
totalChildrenCountGlobal++
if (totalChildrenCountByDepth[obj.__closure[prop].toString()])
totalChildrenCountByDepth[obj.__closure[prop].toString()]++
else totalChildrenCountByDepth[obj.__closure[prop].toString()] = 1
}
}
insertionObject.totalChildrenCount = totalChildrenCountGlobal
insertionObject.totalChildrenCountByDepth = JSON.stringify(
totalChildrenCountByDepth
)
delete insertionObject.__tree
delete insertionObject.__closure
objsToInsert.push(insertionObject)
ids.push(insertionObject.id)
})
const closureBatchSize = 1000
const objectsBatchSize = 500
// step 1: insert objects
if (objsToInsert.length > 0) {
// const batches = chunk(objsToInsert, objectsBatchSize)
const batches = chunkInsertionObjectArray({
objects: objsToInsert,
chunkLengthLimit: objectsBatchSize,
chunkSizeLimitMb: 2
})
for (const batch of batches) {
prepInsertionObjectBatch(batch)
await Objects().insert(batch).onConflict().ignore()
logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects')
}
}
// step 2: insert closures
if (closures.length > 0) {
const batches = chunk(closures, closureBatchSize)
for (const batch of batches) {
prepInsertionClosureBatch(batch)
await Closures().insert(batch).onConflict().ignore()
logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures')
}
}
return true
},
async createObjectsBatchedAndNoClosures({
streamId,
objects,
@@ -629,14 +563,3 @@ module.exports = {
throw new Error('Updating object is not implemented')
}
}
// Batches need to be inserted ordered by id to avoid deadlocks
function prepInsertionObjectBatch(batch) {
batch.sort((a, b) => (a.id > b.id ? 1 : -1))
}
function prepInsertionClosureBatch(batch) {
batch.sort((a, b) =>
a.parent > b.parent ? 1 : a.parent === b.parent ? (a.child > b.child ? 1 : -1) : -1
)
}
@@ -1,17 +1,24 @@
import crypto from 'crypto'
import {
InsertableSpeckleObject,
RawSpeckleObject
RawSpeckleObject,
SpeckleObjectClosureEntry
} from '@/modules/core/domain/objects/types'
import { getMaximumObjectSizeMB } from '@/modules/shared/helpers/envHelper'
import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking'
import {
chunkInsertionObjectArray,
estimateStringMegabyteSize
} from '@/modules/core/utils/chunking'
import { ObjectHandlingError } from '@/modules/core/errors/object'
import { servicesLogger } from '@/logging/logging'
import {
CreateObject,
CreateObjectsBatched,
StoreClosuresIfNotFound,
StoreObjectsIfNotFound,
StoreSingleObjectIfNotFound
} from '@/modules/core/domain/objects/operations'
import { chunk } from 'lodash'
/**
* Note: we're generating the hash here, rather than on the db side, as there are
@@ -58,12 +65,7 @@ export const createObjectFactory =
async ({ streamId, object, logger = servicesLogger }) => {
const insertionObject = prepInsertionObject(streamId, object)
const closures: Array<{
streamId: string
parent: string
child: string
minDepth: number
}> = []
const closures: Array<SpeckleObjectClosureEntry> = []
const totalChildrenCountByDepth: Record<string, number> = {}
if (object.__closure !== null) {
@@ -101,3 +103,86 @@ export const createObjectFactory =
return insertionObject.id
}
// Batches need to be inserted ordered by id to avoid deadlocks
export const prepInsertionObjectBatch = (batch: InsertableSpeckleObject[]) => {
batch.sort((a, b) => (a.id > b.id ? 1 : -1))
}
export const prepInsertionClosureBatch = (batch: SpeckleObjectClosureEntry[]) => {
batch.sort((a, b) =>
a.parent > b.parent ? 1 : a.parent === b.parent ? (a.child > b.child ? 1 : -1) : -1
)
}
export const createObjectsBatchedFactory =
(deps: {
storeObjectsIfNotFoundFactory: StoreObjectsIfNotFound
storeClosuresIfNotFound: StoreClosuresIfNotFound
}): CreateObjectsBatched =>
async ({ streamId, objects, logger = servicesLogger }) => {
const closures: SpeckleObjectClosureEntry[] = []
const objsToInsert: InsertableSpeckleObject[] = []
const ids: string[] = []
// Prep objects up
objects.forEach((obj) => {
const insertionObject = prepInsertionObject(streamId, obj)
let totalChildrenCountGlobal = 0
const totalChildrenCountByDepth: Record<string, number> = {}
if (obj.__closure !== null) {
for (const prop in obj.__closure) {
closures.push({
streamId,
parent: insertionObject.id,
child: prop,
minDepth: obj.__closure[prop]
})
totalChildrenCountGlobal++
if (totalChildrenCountByDepth[obj.__closure[prop].toString()])
totalChildrenCountByDepth[obj.__closure[prop].toString()]++
else totalChildrenCountByDepth[obj.__closure[prop].toString()] = 1
}
}
const finalInsertionObject: InsertableSpeckleObject = {
...insertionObject,
totalChildrenCount: totalChildrenCountGlobal,
totalChildrenCountByDepth: JSON.stringify(totalChildrenCountByDepth)
}
objsToInsert.push(finalInsertionObject)
ids.push(insertionObject.id)
})
const closureBatchSize = 1000
const objectsBatchSize = 500
// step 1: insert objects
if (objsToInsert.length > 0) {
// const batches = chunk(objsToInsert, objectsBatchSize)
const batches = chunkInsertionObjectArray({
objects: objsToInsert,
chunkLengthLimit: objectsBatchSize,
chunkSizeLimitMb: 2
})
for (const batch of batches) {
prepInsertionObjectBatch(batch)
await deps.storeObjectsIfNotFoundFactory(batch)
logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects')
}
}
// step 2: insert closures
if (closures.length > 0) {
const batches = chunk(closures, closureBatchSize)
for (const batch of batches) {
prepInsertionClosureBatch(batch)
await deps.storeClosuresIfNotFound(batch)
logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures')
}
}
return true
}
@@ -1,9 +1,8 @@
import { InsertableSpeckleObject } from '@/modules/core/domain/objects/types'
import { BaseError } from '@/modules/shared/errors'
import { Options } from 'verror'
type InsertionObject = {
data: string
}
type InsertionObject = InsertableSpeckleObject
export class ArgumentError extends BaseError {
static defaultMessage = 'Invalid argument value provided'