From 85f74a4ea4e79d17035800839d54560620c36765 Mon Sep 17 00:00:00 2001 From: Dimitrie Stefanescu Date: Mon, 17 Jun 2024 13:30:46 +0100 Subject: [PATCH] feat(objs): adds feature flag for no closure writing --- packages/server/modules/core/rest/upload.js | 87 +++++++++++-------- .../server/modules/core/services/objects.js | 31 +++++++ packages/shared/src/environment/index.ts | 7 +- 3 files changed, 87 insertions(+), 38 deletions(-) diff --git a/packages/server/modules/core/rest/upload.js b/packages/server/modules/core/rest/upload.js index 868d928e8..ceb06a8f3 100644 --- a/packages/server/modules/core/rest/upload.js +++ b/packages/server/modules/core/rest/upload.js @@ -4,12 +4,21 @@ const { corsMiddleware } = require('@/modules/core/configs/cors') const Busboy = require('busboy') const { validatePermissionsWriteStream } = require('./authUtils') - -const { createObjectsBatched } = require('@/modules/core/services/objects') +const { getFeatureFlags } = require('@/modules/shared/helpers/envHelper') +const { + createObjectsBatched, + createObjectsBatchedAndNoClosures +} = require('@/modules/core/services/objects') const { ObjectHandlingError } = require('@/modules/core/errors/object') const { estimateStringMegabyteSize } = require('@/modules/core/utils/chunking') const MAX_FILE_SIZE = 50 * 1024 * 1024 +const { FF_NO_CLOSURE_WRITES } = getFeatureFlags() + +let objectInsertionService = createObjectsBatched +if (FF_NO_CLOSURE_WRITES) { + objectInsertionService = createObjectsBatchedAndNoClosures +} module.exports = (app) => { app.options('/objects/:streamId', corsMiddleware()) @@ -115,25 +124,27 @@ module.exports = (app) => { await Promise.all(promises) } - const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => { - req.log.error(e, `Upload error.`) - if (!requestDropped) { - switch (e.constructor) { - case ObjectHandlingError: - res - .status(400) - .send(`Error inserting object in the database: ${e.message}`) - break - default: - res - .status(400) - .send( - 'Error inserting object in the database. Check server logs for details' - ) + const promise = objectInsertionService(req.params.streamId, objs).catch( + (e) => { + req.log.error(e, `Upload error.`) + if (!requestDropped) { + switch (e.constructor) { + case ObjectHandlingError: + res + .status(400) + .send(`Error inserting object in the database: ${e.message}`) + break + default: + res + .status(400) + .send( + 'Error inserting object in the database. Check server logs for details' + ) + } } + requestDropped = true } - requestDropped = true - }) + ) promises.push(promise) await promise @@ -204,24 +215,26 @@ module.exports = (app) => { await Promise.all(promises) } - const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => { - req.log.error(e, `Upload error.`) - if (!requestDropped) - switch (e.constructor) { - case ObjectHandlingError: - res - .status(400) - .send(`Error inserting object in the database. ${e.message}`) - break - default: - res - .status(400) - .send( - 'Error inserting object in the database. Check server logs for details' - ) - } - requestDropped = true - }) + const promise = objectInsertionService(req.params.streamId, objs).catch( + (e) => { + req.log.error(e, `Upload error.`) + if (!requestDropped) + switch (e.constructor) { + case ObjectHandlingError: + res + .status(400) + .send(`Error inserting object in the database. ${e.message}`) + break + default: + res + .status(400) + .send( + 'Error inserting object in the database. Check server logs for details' + ) + } + requestDropped = true + } + ) promises.push(promise) await promise diff --git a/packages/server/modules/core/services/objects.js b/packages/server/modules/core/services/objects.js index 7d46fe0ad..1aba84bac 100644 --- a/packages/server/modules/core/services/objects.js +++ b/packages/server/modules/core/services/objects.js @@ -135,6 +135,37 @@ module.exports = { return true }, + async createObjectsBatchedAndNoClosures(streamId, objects) { + const objsToInsert = [] + const ids = [] + + // Prep objects up + objects.forEach((obj) => { + const insertionObject = prepInsertionObject(streamId, obj) + delete insertionObject.__closure + objsToInsert.push(insertionObject) + ids.push(insertionObject.id) + }) + + const objectsBatchSize = 500 + + // step 1: insert objects + if (objsToInsert.length > 0) { + const batches = chunkInsertionObjectArray({ + objects: objsToInsert, + chunkLengthLimit: objectsBatchSize, + chunkSizeLimitMb: 2 + }) + for (const batch of batches) { + prepInsertionObjectBatch(batch) + await Objects().insert(batch).onConflict().ignore() + servicesLogger.info(`Inserted ${batch.length} objects`) + } + } + + return ids + }, + async createObjects(streamId, objects) { // TODO: Switch to knex batch inserting functionality // see http://knexjs.org/#Utility-BatchInsert diff --git a/packages/shared/src/environment/index.ts b/packages/shared/src/environment/index.ts index a76a0a757..c7ed82288 100644 --- a/packages/shared/src/environment/index.ts +++ b/packages/shared/src/environment/index.ts @@ -5,8 +5,12 @@ function parseFeatureFlags() { //INFO // As a convention all feature flags should be prefixed with a FF_ const featureFlagSchema = z.object({ + // Enables the automate module. FF_AUTOMATE_MODULE_ENABLED: z.boolean().default(false), - FF_GENDOAI_MODULE_ENABLED: z.boolean().default(false) + // Enables the gendo ai integration + FF_GENDOAI_MODULE_ENABLED: z.boolean().default(false), + // Disables writing to the closure table in the create objects batched services (re object upload routes) + FF_NO_CLOSURE_WRITES: z.boolean().default(false) }) return parseEnv(process.env, featureFlagSchema.shape) } @@ -16,6 +20,7 @@ let parsedFlags: ReturnType | undefined export function getFeatureFlags(): { FF_AUTOMATE_MODULE_ENABLED: boolean FF_GENDOAI_MODULE_ENABLED: boolean + FF_NO_CLOSURE_WRITES: boolean } { if (!parsedFlags) parsedFlags = parseFeatureFlags() return parsedFlags