feat(objs): adds feature flag for no closure writing
This commit is contained in:
@@ -4,12 +4,21 @@ const { corsMiddleware } = require('@/modules/core/configs/cors')
|
||||
const Busboy = require('busboy')
|
||||
|
||||
const { validatePermissionsWriteStream } = require('./authUtils')
|
||||
|
||||
const { createObjectsBatched } = require('@/modules/core/services/objects')
|
||||
const { getFeatureFlags } = require('@/modules/shared/helpers/envHelper')
|
||||
const {
|
||||
createObjectsBatched,
|
||||
createObjectsBatchedAndNoClosures
|
||||
} = require('@/modules/core/services/objects')
|
||||
const { ObjectHandlingError } = require('@/modules/core/errors/object')
|
||||
const { estimateStringMegabyteSize } = require('@/modules/core/utils/chunking')
|
||||
|
||||
const MAX_FILE_SIZE = 50 * 1024 * 1024
|
||||
const { FF_NO_CLOSURE_WRITES } = getFeatureFlags()
|
||||
|
||||
let objectInsertionService = createObjectsBatched
|
||||
if (FF_NO_CLOSURE_WRITES) {
|
||||
objectInsertionService = createObjectsBatchedAndNoClosures
|
||||
}
|
||||
|
||||
module.exports = (app) => {
|
||||
app.options('/objects/:streamId', corsMiddleware())
|
||||
@@ -115,25 +124,27 @@ module.exports = (app) => {
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
||||
req.log.error(e, `Upload error.`)
|
||||
if (!requestDropped) {
|
||||
switch (e.constructor) {
|
||||
case ObjectHandlingError:
|
||||
res
|
||||
.status(400)
|
||||
.send(`Error inserting object in the database: ${e.message}`)
|
||||
break
|
||||
default:
|
||||
res
|
||||
.status(400)
|
||||
.send(
|
||||
'Error inserting object in the database. Check server logs for details'
|
||||
)
|
||||
const promise = objectInsertionService(req.params.streamId, objs).catch(
|
||||
(e) => {
|
||||
req.log.error(e, `Upload error.`)
|
||||
if (!requestDropped) {
|
||||
switch (e.constructor) {
|
||||
case ObjectHandlingError:
|
||||
res
|
||||
.status(400)
|
||||
.send(`Error inserting object in the database: ${e.message}`)
|
||||
break
|
||||
default:
|
||||
res
|
||||
.status(400)
|
||||
.send(
|
||||
'Error inserting object in the database. Check server logs for details'
|
||||
)
|
||||
}
|
||||
}
|
||||
requestDropped = true
|
||||
}
|
||||
requestDropped = true
|
||||
})
|
||||
)
|
||||
promises.push(promise)
|
||||
|
||||
await promise
|
||||
@@ -204,24 +215,26 @@ module.exports = (app) => {
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
const promise = createObjectsBatched(req.params.streamId, objs).catch((e) => {
|
||||
req.log.error(e, `Upload error.`)
|
||||
if (!requestDropped)
|
||||
switch (e.constructor) {
|
||||
case ObjectHandlingError:
|
||||
res
|
||||
.status(400)
|
||||
.send(`Error inserting object in the database. ${e.message}`)
|
||||
break
|
||||
default:
|
||||
res
|
||||
.status(400)
|
||||
.send(
|
||||
'Error inserting object in the database. Check server logs for details'
|
||||
)
|
||||
}
|
||||
requestDropped = true
|
||||
})
|
||||
const promise = objectInsertionService(req.params.streamId, objs).catch(
|
||||
(e) => {
|
||||
req.log.error(e, `Upload error.`)
|
||||
if (!requestDropped)
|
||||
switch (e.constructor) {
|
||||
case ObjectHandlingError:
|
||||
res
|
||||
.status(400)
|
||||
.send(`Error inserting object in the database. ${e.message}`)
|
||||
break
|
||||
default:
|
||||
res
|
||||
.status(400)
|
||||
.send(
|
||||
'Error inserting object in the database. Check server logs for details'
|
||||
)
|
||||
}
|
||||
requestDropped = true
|
||||
}
|
||||
)
|
||||
promises.push(promise)
|
||||
|
||||
await promise
|
||||
|
||||
@@ -135,6 +135,37 @@ module.exports = {
|
||||
return true
|
||||
},
|
||||
|
||||
async createObjectsBatchedAndNoClosures(streamId, objects) {
|
||||
const objsToInsert = []
|
||||
const ids = []
|
||||
|
||||
// Prep objects up
|
||||
objects.forEach((obj) => {
|
||||
const insertionObject = prepInsertionObject(streamId, obj)
|
||||
delete insertionObject.__closure
|
||||
objsToInsert.push(insertionObject)
|
||||
ids.push(insertionObject.id)
|
||||
})
|
||||
|
||||
const objectsBatchSize = 500
|
||||
|
||||
// step 1: insert objects
|
||||
if (objsToInsert.length > 0) {
|
||||
const batches = chunkInsertionObjectArray({
|
||||
objects: objsToInsert,
|
||||
chunkLengthLimit: objectsBatchSize,
|
||||
chunkSizeLimitMb: 2
|
||||
})
|
||||
for (const batch of batches) {
|
||||
prepInsertionObjectBatch(batch)
|
||||
await Objects().insert(batch).onConflict().ignore()
|
||||
servicesLogger.info(`Inserted ${batch.length} objects`)
|
||||
}
|
||||
}
|
||||
|
||||
return ids
|
||||
},
|
||||
|
||||
async createObjects(streamId, objects) {
|
||||
// TODO: Switch to knex batch inserting functionality
|
||||
// see http://knexjs.org/#Utility-BatchInsert
|
||||
|
||||
@@ -5,8 +5,12 @@ function parseFeatureFlags() {
|
||||
//INFO
|
||||
// As a convention all feature flags should be prefixed with a FF_
|
||||
const featureFlagSchema = z.object({
|
||||
// Enables the automate module.
|
||||
FF_AUTOMATE_MODULE_ENABLED: z.boolean().default(false),
|
||||
FF_GENDOAI_MODULE_ENABLED: z.boolean().default(false)
|
||||
// Enables the gendo ai integration
|
||||
FF_GENDOAI_MODULE_ENABLED: z.boolean().default(false),
|
||||
// Disables writing to the closure table in the create objects batched services (re object upload routes)
|
||||
FF_NO_CLOSURE_WRITES: z.boolean().default(false)
|
||||
})
|
||||
return parseEnv(process.env, featureFlagSchema.shape)
|
||||
}
|
||||
@@ -16,6 +20,7 @@ let parsedFlags: ReturnType<typeof parseFeatureFlags> | undefined
|
||||
export function getFeatureFlags(): {
|
||||
FF_AUTOMATE_MODULE_ENABLED: boolean
|
||||
FF_GENDOAI_MODULE_ENABLED: boolean
|
||||
FF_NO_CLOSURE_WRITES: boolean
|
||||
} {
|
||||
if (!parsedFlags) parsedFlags = parseFeatureFlags()
|
||||
return parsedFlags
|
||||
|
||||
Reference in New Issue
Block a user