diff --git a/packages/fileimport-service/ifc/api.js b/packages/fileimport-service/ifc/api.js index e90282361..ea62993ca 100644 --- a/packages/fileimport-service/ifc/api.js +++ b/packages/fileimport-service/ifc/api.js @@ -31,7 +31,7 @@ module.exports = class ServerAPI { obj.id = crypto.createHash('md5').update(JSON.stringify(obj)).digest('hex') } - await this.createObject({ streamId: this.streamId, object: obj }) + await this.createObject(this.streamId, obj) return obj.id } @@ -40,7 +40,7 @@ module.exports = class ServerAPI { return await this.createObjectsBatched(this.streamId, objs) } - async createObject({ streamId, object }) { + async createObject(streamId, object) { const insertionObject = this.prepInsertionObject(streamId, object) const closures = [] diff --git a/packages/server/modules/activitystream/tests/activity.spec.js b/packages/server/modules/activitystream/tests/activity.spec.js index f56823862..bc5d3ab97 100644 --- a/packages/server/modules/activitystream/tests/activity.spec.js +++ b/packages/server/modules/activitystream/tests/activity.spec.js @@ -129,7 +129,7 @@ describe('Activity @activity', () => { streamSecret.id = resStream1.body.data.streamCreate // create commit (cr2) - testObj2.id = await createObject({ streamId: streamSecret.id, object: testObj2 }) + testObj2.id = await createObject(streamSecret.id, testObj2) const resCommit1 = await sendRequest(userCr.token, { query: `mutation { commitCreate(commit: {streamId: "${streamSecret.id}", branchName: "main", objectId: "${testObj2.id}", message: "first commit"})}` }) @@ -152,7 +152,7 @@ describe('Activity @activity', () => { branchPublic.id = resBranch.body.data.branchCreate // create commit #2 (iz3) - testObj.id = await createObject({ streamId: streamPublic.id, object: testObj }) + testObj.id = await createObject(streamPublic.id, testObj) const resCommit2 = await sendRequest(userIz.token, { query: `mutation { commitCreate(commit: { streamId: "${streamPublic.id}", branchName: "${branchPublic.name}", objectId: "${testObj.id}", message: "first commit" })}` }) diff --git a/packages/server/modules/comments/tests/comments.graph.spec.js b/packages/server/modules/comments/tests/comments.graph.spec.js index 8a1165979..915681fdc 100644 --- a/packages/server/modules/comments/tests/comments.graph.spec.js +++ b/packages/server/modules/comments/tests/comments.graph.spec.js @@ -286,7 +286,7 @@ const queryComments = async ({ apollo, resources, shouldSucceed }) => { bar: crs({ length: 5 }) } - const objectId = await createObject({ streamId: resources.streamId, object }) + const objectId = await createObject(resources.streamId, object) const numberOfComments = 3 const commentIds = await Promise.all( @@ -361,12 +361,9 @@ const queryStreamCommentCount = async ({ apollo, resources, shouldSucceed }) => } const queryObjectCommentCount = async ({ apollo, resources, shouldSucceed }) => { - const objectId = await createObject({ - streamId: resources.streamId, - object: { - foo: 'bar', - noise: crs({ length: 5 }) - } + const objectId = await createObject(resources.streamId, { + foo: 'bar', + noise: crs({ length: 5 }) }) await createComment({ userId: resources.testActorId, @@ -397,12 +394,9 @@ const queryObjectCommentCount = async ({ apollo, resources, shouldSucceed }) => } const queryCommitCommentCount = async ({ apollo, resources, shouldSucceed }) => { - const objectId = await createObject({ - streamId: resources.streamId, - object: { - foo: 'bar', - notSignal: crs({ length: 10 }) - } + const objectId = await createObject(resources.streamId, { + foo: 'bar', + notSignal: crs({ length: 10 }) }) const commitId = await createCommitByBranchName({ streamId: resources.streamId, @@ -444,12 +438,9 @@ const queryCommitCollectionCommentCount = async ({ resources, shouldSucceed }) => { - const objectId = await createObject({ - streamId: resources.streamId, - object: { - foo: 'bar', - almostMakesSense: crs({ length: 10 }) - } + const objectId = await createObject(resources.streamId, { + foo: 'bar', + almostMakesSense: crs({ length: 10 }) }) const commitId = await createCommitByBranchName({ streamId: resources.streamId, @@ -852,10 +843,7 @@ describe('Graphql @comments', () => { }) } - const objectId = await createObject({ - streamId: stream.id, - object: { test: 'object' } - }) + const objectId = await createObject(stream.id, { test: 'object' }) const { id: commentId } = await createComment({ userId: myTestActor.id, diff --git a/packages/server/modules/comments/tests/comments.spec.js b/packages/server/modules/comments/tests/comments.spec.js index 621e958e9..a62b30211 100644 --- a/packages/server/modules/comments/tests/comments.spec.js +++ b/packages/server/modules/comments/tests/comments.spec.js @@ -105,8 +105,8 @@ describe('Comments @comments', () => { stream.id = await createStream({ ...stream, ownerId: user.id }) - testObject1.id = await createObject({ streamId: stream.id, object: testObject1 }) - testObject2.id = await createObject({ streamId: stream.id, object: testObject2 }) + testObject1.id = await createObject(stream.id, testObject1) + testObject2.id = await createObject(stream.id, testObject2) commitId1 = await createCommitByBranchName({ streamId: stream.id, @@ -163,7 +163,7 @@ describe('Comments @comments', () => { const streamA = { name: 'Stream A' } streamA.id = await createStream({ ...streamA, ownerId: user.id }) const objA = { foo: 'bar' } - objA.id = await createObject({ streamId: streamA.id, object: objA }) + objA.id = await createObject(streamA.id, objA) const commA = {} commA.id = await createCommitByBranchName({ streamId: streamA.id, @@ -177,7 +177,7 @@ describe('Comments @comments', () => { const streamB = { name: 'Stream B' } streamB.id = await createStream({ ...streamB, ownerId: otherUser.id }) const objB = { qux: 'mux' } - objB.id = await createObject({ streamId: streamB.id, object: objB }) + objB.id = await createObject(streamB.id, objB) const commB = {} commB.id = await createCommitByBranchName({ streamId: streamB.id, @@ -267,7 +267,7 @@ describe('Comments @comments', () => { const stream = { name: 'Bean Counter' } stream.id = await createStream({ ...stream, ownerId: user.id }) const obj = { foo: 'bar' } - obj.id = await createObject({ streamId: stream.id, object: obj }) + obj.id = await createObject(stream.id, obj) const commit = {} commit.id = await createCommitByBranchName({ streamId: stream.id, @@ -358,7 +358,7 @@ describe('Comments @comments', () => { const streamOther = { name: 'Bean Counter' } streamOther.id = await createStream({ ...streamOther, ownerId: user.id }) const objOther = { 'are you bored': 'yes' } - objOther.id = await createObject({ streamId: streamOther.id, object: objOther }) + objOther.id = await createObject(streamOther.id, objOther) const commitOther = {} commitOther.id = await createCommitByBranchName({ streamId: streamOther.id, @@ -560,10 +560,7 @@ describe('Comments @comments', () => { }) it('Should not return the same comment multiple times for multi resource comments', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { testObject: 1 } - }) + const localObjectId = await createObject(stream.id, { testObject: 1 }) const commentCount = 3 for (let i = 0; i < commentCount; i++) { @@ -603,11 +600,8 @@ describe('Comments @comments', () => { }) it('Should handle cursor and limit for queries', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { - testObject: 'something completely different' - } + const localObjectId = await createObject(stream.id, { + testObject: 'something completely different' }) const createdComments = [] @@ -697,10 +691,7 @@ describe('Comments @comments', () => { }) it('Should return all the referenced resources for a comment', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { anotherTestObject: 1 } - }) + const localObjectId = await createObject(stream.id, { anotherTestObject: 1 }) const inputResources = [ { resourceId: stream.id, resourceType: 'stream' }, { resourceId: commitId1, resourceType: 'commit' }, @@ -731,10 +722,7 @@ describe('Comments @comments', () => { }) it('Should return the same data when querying a single comment vs a list of comments', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { anotherTestObject: 42 } - }) + const localObjectId = await createObject(stream.id, { anotherTestObject: 42 }) await createComment({ userId: user.id, input: { @@ -765,11 +753,8 @@ describe('Comments @comments', () => { }) it('Should be able to edit a comment text', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { - anotherTestObject: crs({ length: 10 }) - } + const localObjectId = await createObject(stream.id, { + anotherTestObject: crs({ length: 10 }) }) const { id: commentId } = await createComment({ userId: user.id, @@ -804,11 +789,8 @@ describe('Comments @comments', () => { }) it('Should not be allowed to edit a comment of another user if its restricted', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { - anotherTestObject: crs({ length: 10 }) - } + const localObjectId = await createObject(stream.id, { + anotherTestObject: crs({ length: 10 }) }) const { id: commentId } = await createComment({ userId: user.id, @@ -926,11 +908,8 @@ describe('Comments @comments', () => { }) it('Should not query archived comments unless asked', async () => { - const localObjectId = await createObject({ - streamId: stream.id, - object: { - testObject: crs({ length: 10 }) - } + const localObjectId = await createObject(stream.id, { + testObject: crs({ length: 10 }) }) const commentCount = 15 diff --git a/packages/server/modules/core/graph/resolvers/objects.ts b/packages/server/modules/core/graph/resolvers/objects.ts index 83e847a46..644ea288a 100644 --- a/packages/server/modules/core/graph/resolvers/objects.ts +++ b/packages/server/modules/core/graph/resolvers/objects.ts @@ -66,10 +66,10 @@ export = { context.resourceAccessRules ) - const ids = await createObjects({ - streamId: args.objectInput.streamId, - objects: args.objectInput.objects - }) + const ids = await createObjects( + args.objectInput.streamId, + args.objectInput.objects + ) return ids } } diff --git a/packages/server/modules/core/rest/upload.js b/packages/server/modules/core/rest/upload.js new file mode 100644 index 000000000..55440c8d9 --- /dev/null +++ b/packages/server/modules/core/rest/upload.js @@ -0,0 +1,294 @@ +'use strict' +const zlib = require('zlib') +const { corsMiddleware } = require('@/modules/core/configs/cors') +const Busboy = require('busboy') + +const { validatePermissionsWriteStream } = require('./authUtils') +const { getFeatureFlags } = require('@/modules/shared/helpers/envHelper') +const { + createObjectsBatched, + createObjectsBatchedAndNoClosures +} = require('@/modules/core/services/objects') +const { ObjectHandlingError } = require('@/modules/core/errors/object') +const { estimateStringMegabyteSize } = require('@/modules/core/utils/chunking') + +const MAX_FILE_SIZE = 50 * 1024 * 1024 +const { FF_NO_CLOSURE_WRITES } = getFeatureFlags() + +let objectInsertionService = createObjectsBatched +if (FF_NO_CLOSURE_WRITES) { + objectInsertionService = createObjectsBatchedAndNoClosures +} + +module.exports = (app) => { + app.options('/objects/:streamId', corsMiddleware()) + + app.post('/objects/:streamId', corsMiddleware(), async (req, res) => { + req.log = req.log.child({ + userId: req.context.userId || '-', + streamId: req.params.streamId + }) + + const hasStreamAccess = await validatePermissionsWriteStream( + req.params.streamId, + req + ) + if (!hasStreamAccess.result) { + return res.status(hasStreamAccess.status).end() + } + + let busboy + try { + busboy = Busboy({ headers: req.headers }) + } catch (e) { + req.log.warn( + e, + 'Failed to parse request headers and body content as valid multipart/form-data.' + ) + return res + .status(400) + .send( + 'Failed to parse request headers and body content as valid multipart/form-data.' + ) + } + let totalProcessed = 0 + // let last = {} + + const promises = [] + let requestDropped = false + + busboy.on('file', (name, file, info) => { + const { mimeType } = info + + if (requestDropped) return + + if (mimeType === 'application/gzip') { + const buffer = [] + + file.on('data', (data) => { + if (data) buffer.push(data) + }) + + file.on('end', async () => { + req.log.info( + `File upload of the multipart form has reached an end of file (EOF) boundary. The mimetype of the file is '${mimeType}'.` + ) + if (requestDropped) return + const t0 = Date.now() + let objs = [] + + const gzippedBuffer = Buffer.concat(buffer) + if (gzippedBuffer.length > MAX_FILE_SIZE) { + req.log.error( + `Upload error: Batch size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})` + ) + if (!requestDropped) + res + .status(400) + .send( + `File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})` + ) + requestDropped = true + } + + const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString() + const gunzippedBufferMegabyteSize = + estimateStringMegabyteSize(gunzippedBuffer) + if (gunzippedBufferMegabyteSize > MAX_FILE_SIZE) { + req.log.error( + `upload error: batch size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})` + ) + if (!requestDropped) + res + .status(400) + .send( + `File size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})` + ) + requestDropped = true + } + + try { + objs = JSON.parse(gunzippedBuffer) + } catch { + req.log.error(`Upload error: Batch not in JSON format`) + if (!requestDropped) res.status(400).send('Failed to parse data.') + requestDropped = true + } + + // last = objs[objs.length - 1] + totalProcessed += objs.length + + let previouslyAwaitedPromises = 0 + while (previouslyAwaitedPromises !== promises.length) { + previouslyAwaitedPromises = promises.length + await Promise.all(promises) + } + + const promise = objectInsertionService(req.params.streamId, objs).catch( + (e) => { + req.log.error(e, `Upload error.`) + if (!requestDropped) { + switch (e.constructor) { + case ObjectHandlingError: + res + .status(400) + .send(`Error inserting object in the database: ${e.message}`) + break + default: + res + .status(400) + .send( + 'Error inserting object in the database. Check server logs for details' + ) + } + } + requestDropped = true + } + ) + promises.push(promise) + + await promise + + req.log.info( + { + objectCount: objs.length, + durationSeconds: (Date.now() - t0) / 1000, + crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024, + uploadedSizeMB: gunzippedBuffer.length / 1000000, + requestDropped + }, + 'Uploaded batch of {objectCount} objects' + ) + }) + } else if ( + mimeType === 'text/plain' || + mimeType === 'application/json' || + mimeType === 'application/octet-stream' + ) { + let buffer = '' + file.on('data', (data) => { + if (data) buffer += data + }) + + file.on('end', async () => { + if (requestDropped) return + const t0 = Date.now() + let objs = [] + + if (buffer.length > MAX_FILE_SIZE) { + req.log.error( + `Upload error: Batch size too large (${buffer.length} > ${MAX_FILE_SIZE})` + ) + if (!requestDropped) + res + .status(400) + .send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`) + requestDropped = true + } + + try { + objs = JSON.parse(buffer) + } catch { + req.log.error(`Upload error: Batch not in JSON format`) + if (!requestDropped) + res.status(400).send('Failed to parse data. Batch is not in JSON format.') + requestDropped = true + } + if (!Array.isArray(objs)) { + req.log.error(`Upload error: Batch not an array`) + if (!requestDropped) + res + .status(400) + .send( + 'Failed to parse data. Batch is expected to be wrapped in a JSON array.' + ) + requestDropped = true + } + //FIXME should we exit here if requestDropped is true + + totalProcessed += objs.length + req.log.debug( + `total objects, including current pending batch, processed so far is ${totalProcessed}` + ) + let previouslyAwaitedPromises = 0 + while (previouslyAwaitedPromises !== promises.length) { + previouslyAwaitedPromises = promises.length + await Promise.all(promises) + } + + const promise = objectInsertionService(req.params.streamId, objs).catch( + (e) => { + req.log.error(e, `Upload error.`) + if (!requestDropped) + switch (e.constructor) { + case ObjectHandlingError: + res + .status(400) + .send(`Error inserting object in the database. ${e.message}`) + break + default: + res + .status(400) + .send( + 'Error inserting object in the database. Check server logs for details' + ) + } + requestDropped = true + } + ) + promises.push(promise) + + await promise + req.log.info( + { + objectCount: objs.length, + uploadedSizeMB: estimateStringMegabyteSize(buffer), + durationSeconds: (Date.now() - t0) / 1000, + crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024, + requestDropped + }, + 'Uploaded batch of {objectCount} objects.' + ) + }) + } else { + req.log.info(`Invalid ContentType header: ${mimeType}`) + if (!requestDropped) + res + .status(400) + .send( + 'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".' + ) + requestDropped = true + } + }) + + busboy.on('finish', async () => { + if (requestDropped) return + + req.log.info( + { + totalProcessed, + crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024 + }, + 'Upload finished: {totalProcessed} objects' + ) + + let previouslyAwaitedPromises = 0 + while (previouslyAwaitedPromises !== promises.length) { + previouslyAwaitedPromises = promises.length + await Promise.all(promises) + } + + res.status(201).end() + }) + + busboy.on('error', async (err) => { + req.log.info(`Upload error: ${err}`) + if (!requestDropped) + res.status(400).end('Upload request error. The server logs have more details') + requestDropped = true + }) + + req.pipe(busboy) + }) +} diff --git a/packages/server/modules/core/rest/upload.ts b/packages/server/modules/core/rest/upload.ts deleted file mode 100644 index aa9582bd3..000000000 --- a/packages/server/modules/core/rest/upload.ts +++ /dev/null @@ -1,416 +0,0 @@ -import zlib from 'zlib' -import { corsMiddleware } from '@/modules/core/configs/cors' -import Busboy from 'busboy' -import { validatePermissionsWriteStream } from '@/modules/core/rest/authUtils' -import { - getFeatureFlags, - maximumObjectUploadFileSizeMb -} from '@/modules/shared/helpers/envHelper' -import { - createObjectsBatched, - createObjectsBatchedAndNoClosures -} from '@/modules/core/services/objects' -import { ObjectHandlingError } from '@/modules/core/errors/object' -import { estimateStringMegabyteSize } from '@/modules/core/utils/chunking' -import { toMegabytesWith1DecimalPlace } from '@/modules/core/utils/formatting' -import { Logger } from 'pino' -import { Router } from 'express' - -const MAX_FILE_SIZE = maximumObjectUploadFileSizeMb() * 1024 * 1024 -const { FF_NO_CLOSURE_WRITES } = getFeatureFlags() - -let objectInsertionService: (params: { - streamId: string - objects: unknown[] - logger?: Logger -}) => Promise = createObjectsBatched -if (FF_NO_CLOSURE_WRITES) { - objectInsertionService = createObjectsBatchedAndNoClosures -} - -export default (app: Router) => { - app.options('/objects/:streamId', corsMiddleware()) - - app.post('/objects/:streamId', corsMiddleware(), async (req, res) => { - const calculateLogMetadata = (params: { - batchSizeMb: number - start: number - batchStartTime: number - totalObjectsProcessed: number - }) => { - return { - batchSizeMb: params.batchSizeMb, - maxFileSizeMb: toMegabytesWith1DecimalPlace(MAX_FILE_SIZE), - elapsedTimeMs: Date.now() - params.start, - batchElapsedTimeMs: Date.now() - params.batchStartTime, - totalObjectsProcessed: params.totalObjectsProcessed - } - } - - req.log = req.log.child({ - userId: req.context.userId || '-', - streamId: req.params.streamId - }) - - const start = Date.now() - - const hasStreamAccess = await validatePermissionsWriteStream( - req.params.streamId, - req - ) - if (!hasStreamAccess.result) { - return res.status(hasStreamAccess.status).end() - } - - let busboy - try { - busboy = Busboy({ headers: req.headers }) - } catch (e) { - req.log.warn( - e, - 'Failed to parse request headers and body content as valid multipart/form-data.' - ) - return res - .status(400) - .send( - 'Failed to parse request headers and body content as valid multipart/form-data.' - ) - } - let totalObjectsProcessed = 0 - - const promises: Promise[] = [] - let requestDropped = false - - busboy.on('file', (name, file, info) => { - const { mimeType } = info - - if (requestDropped) return - - if (mimeType === 'application/gzip') { - const buffer: Uint8Array[] = [] - - file.on('data', (data) => { - if (data) buffer.push(data) - }) - - file.on('end', async () => { - req.log.info( - `File upload of the multipart form has reached an end of file (EOF) boundary. The mimetype of the file is '${mimeType}'.` - ) - if (requestDropped) return - const batchStartTime = Date.now() - let objs = [] - - const gzippedBuffer = Buffer.concat(buffer) - if (gzippedBuffer.length > MAX_FILE_SIZE) { - req.log.error( - calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(gzippedBuffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res - .status(400) - .send( - `File size too large (${gzippedBuffer.length} > ${MAX_FILE_SIZE})` - ) - requestDropped = true - } - - const gunzippedBuffer = zlib.gunzipSync(gzippedBuffer).toString() - const gunzippedBufferMegabyteSize = - estimateStringMegabyteSize(gunzippedBuffer) - if (gunzippedBufferMegabyteSize > MAX_FILE_SIZE) { - req.log.error( - calculateLogMetadata({ - batchSizeMb: gunzippedBufferMegabyteSize, - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res - .status(400) - .send( - `File size too large (${gunzippedBufferMegabyteSize} > ${MAX_FILE_SIZE})` - ) - requestDropped = true - } - - try { - objs = JSON.parse(gunzippedBuffer) - } catch { - req.log.error( - calculateLogMetadata({ - batchSizeMb: gunzippedBufferMegabyteSize, - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch of objects took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) res.status(400).send('Failed to parse data.') - requestDropped = true - } - - // last = objs[objs.length - 1] - totalObjectsProcessed += objs.length - - let previouslyAwaitedPromises = 0 - while (previouslyAwaitedPromises !== promises.length) { - previouslyAwaitedPromises = promises.length - await Promise.all(promises) - } - - const promise = objectInsertionService({ - streamId: req.params.streamId, - objects: objs, - logger: req.log - }).catch((e) => { - req.log.error( - { - ...calculateLogMetadata({ - batchSizeMb: gunzippedBufferMegabyteSize, - start, - batchStartTime, - totalObjectsProcessed - }), - objectCount: objs.length, - err: e - }, - `Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.` - ) - if (!requestDropped) { - switch (e.constructor) { - case ObjectHandlingError: - res - .status(400) - .send(`Error inserting object in the database: ${e.message}`) - break - default: - res - .status(400) - .send( - 'Error inserting object in the database. Check server logs for details' - ) - } - } - requestDropped = true - }) - promises.push(promise) - - await promise - - req.log.info( - { - objectCount: objs.length, - elapsedTimeMs: Date.now() - start, - batchElapsedTimeMs: Date.now() - batchStartTime, - crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024, - uploadedSizeMB: toMegabytesWith1DecimalPlace(gunzippedBuffer.length), - requestDropped, - totalObjectsProcessed - }, - 'Uploaded batch of {objectCount} objects in {batchElapsedTimeMs}ms. Total objects processed so far: {totalObjectsProcessed} in a total of {elapsedTimeMs}ms.' - ) - }) - } else if ( - mimeType === 'text/plain' || - mimeType === 'application/json' || - mimeType === 'application/octet-stream' - ) { - let buffer = '' - file.on('data', (data) => { - if (data) buffer += data - }) - - file.on('end', async () => { - if (requestDropped) return - const batchStartTime = Date.now() - let objs = [] - - if (buffer.length > MAX_FILE_SIZE) { - req.log.error( - calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: Batch size too large ({batchSizeMb} > {maxFileSizeMb}). Error occurred after {elapsedTimeMs}ms. This batch took {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res - .status(400) - .send(`File size too large (${buffer.length} > ${MAX_FILE_SIZE})`) - requestDropped = true - } - - try { - objs = JSON.parse(buffer) - } catch { - req.log.error( - calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: Batch not in JSON format. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res.status(400).send('Failed to parse data. Batch is not in JSON format.') - requestDropped = true - } - if (!Array.isArray(objs)) { - req.log.error( - calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - 'Upload error: Batch not an array. Error occurred after {elapsedTimeMs}ms. This batch failed after {batchElapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res - .status(400) - .send( - 'Failed to parse data. Batch is expected to be wrapped in a JSON array.' - ) - requestDropped = true - } - //FIXME should we exit here if requestDropped is true - - totalObjectsProcessed += objs.length - req.log.debug( - { - ...calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - objectCount: objs.length - }, - 'Total objects, including current pending batch of {objectCount} objects, processed so far is {totalObjectsProcessed}. This batch has taken {batchElapsedTimeMs}ms. Total time elapsed is {elapsedTimeMs}ms.' - ) - let previouslyAwaitedPromises = 0 - while (previouslyAwaitedPromises !== promises.length) { - previouslyAwaitedPromises = promises.length - await Promise.all(promises) - } - - const promise = objectInsertionService({ - streamId: req.params.streamId, - objects: objs, - logger: req.log - }).catch((e) => { - req.log.error( - { - ...calculateLogMetadata({ - batchSizeMb: toMegabytesWith1DecimalPlace(buffer.length), - start, - batchStartTime, - totalObjectsProcessed - }), - err: e - }, - `Upload error when inserting objects into database. Number of objects: {objectCount}. This batch took {batchElapsedTimeMs}ms. Error occurred after {elapsedTimeMs}ms. Total objects processed before error: {totalObjectsProcessed}.` - ) - if (!requestDropped) - switch (e.constructor) { - case ObjectHandlingError: - res - .status(400) - .send(`Error inserting object in the database. ${e.message}`) - break - default: - res - .status(400) - .send( - 'Error inserting object in the database. Check server logs for details' - ) - } - requestDropped = true - }) - promises.push(promise) - - await promise - req.log.info( - { - ...calculateLogMetadata({ - batchSizeMb: estimateStringMegabyteSize(buffer), - start, - batchStartTime, - totalObjectsProcessed - }), - objectCount: objs.length, - crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024 - }, - 'Uploaded batch of {objectCount} objects. Total number of objects processed is {totalObjectsProcessed}. This batch took {batchElapsedTimeMs}ms.' - ) - }) - } else { - req.log.info( - { - mimeType, - totalObjectsProcessed - }, - 'Invalid ContentType header: {mimeType}. Total number of objects processed so far: {totalObjectsProcessed}.' - ) - if (!requestDropped) - res - .status(400) - .send( - 'Invalid ContentType header. This route only accepts "application/gzip", "text/plain" or "application/json".' - ) - requestDropped = true - } - }) - - busboy.on('finish', async () => { - if (requestDropped) return - - req.log.info( - { - totalObjectsProcessed, - crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024, - elapsedTimeMs: Date.now() - start - }, - 'Upload finished: {totalObjectsProcessed} objects processed in {elapsedTimeMs}ms' - ) - - let previouslyAwaitedPromises = 0 - while (previouslyAwaitedPromises !== promises.length) { - previouslyAwaitedPromises = promises.length - await Promise.all(promises) - } - - res.status(201).end() - }) - - busboy.on('error', async (err) => { - req.log.info( - { - err, - totalObjectsProcessed, - elapsedTimeMs: Date.now() - start, - crtMemUsageMB: process.memoryUsage().heapUsed / 1024 / 1024 - }, - 'Error during upload. Error occurred after {elapsedTimeMs}ms. Objects processed before error: {totalObjectsProcessed}. Error: {error}' - ) - if (!requestDropped) - res.status(400).end('Upload request error. The server logs have more details') - requestDropped = true - }) - - req.pipe(busboy) - }) -} diff --git a/packages/server/modules/core/services/objects.js b/packages/server/modules/core/services/objects.js index 2e5bf25af..8f67662b0 100644 --- a/packages/server/modules/core/services/objects.js +++ b/packages/server/modules/core/services/objects.js @@ -17,10 +17,9 @@ const Closures = () => knex('object_children_closure') module.exports = { /** - * @param {{streamId, object, logger?}} params * @returns {Promise} */ - async createObject({ streamId, object, logger = servicesLogger }) { + async createObject(streamId, object) { const insertionObject = prepInsertionObject(streamId, object) const closures = [] @@ -58,12 +57,10 @@ module.exports = { } } - logger.debug({ objectId: insertionObject.id }, 'Inserted object: {objectId}') - return insertionObject.id }, - async createObjectsBatched({ streamId, objects, logger = servicesLogger }) { + async createObjectsBatched(streamId, objects) { const closures = [] const objsToInsert = [] const ids = [] @@ -115,7 +112,10 @@ module.exports = { for (const batch of batches) { prepInsertionObjectBatch(batch) await Objects().insert(batch).onConflict().ignore() - logger.info({ objectCount: batch.length }, 'Inserted {objectCount} objects') + servicesLogger.info( + { objectCount: batch.length }, + 'Inserted ${objectCount} objects' + ) } } @@ -126,17 +126,16 @@ module.exports = { for (const batch of batches) { prepInsertionClosureBatch(batch) await Closures().insert(batch).onConflict().ignore() - logger.info({ batchLength: batch.length }, 'Inserted {batchLength} closures') + servicesLogger.info( + { batchLength: batch.length }, + 'Inserted ${batchLength} closures' + ) } } return true }, - async createObjectsBatchedAndNoClosures({ - streamId, - objects, - logger = servicesLogger - }) { + async createObjectsBatchedAndNoClosures(streamId, objects) { const objsToInsert = [] const ids = [] @@ -160,7 +159,10 @@ module.exports = { for (const batch of batches) { prepInsertionObjectBatch(batch) await Objects().insert(batch).onConflict().ignore() - logger.info({ batchLength: batch.length }, 'Inserted {batchLength} objects.') + servicesLogger.info( + { batchLength: batch.length }, + 'Inserted {batchLength} objects' + ) } } @@ -170,7 +172,7 @@ module.exports = { /** * @returns {Promise} */ - async createObjects({ streamId, objects, logger = servicesLogger }) { + async createObjects(streamId, objects) { // TODO: Switch to knex batch inserting functionality // see http://knexjs.org/#Utility-BatchInsert const batches = [] @@ -234,16 +236,12 @@ module.exports = { } const t1 = performance.now() - - logger.info( - { - batchIndex: index + 1, - totalCountOfBatches: batches.length, - countStoredObjects: closures.length + objsToInsert.length, - elapsedTimeMs: t1 - t0 - }, - 'Batch {batchIndex}/{totalCountOfBatches}: Stored {countStoredObjects} objects in {elapsedTimeMs}ms.' + servicesLogger.info( + `Batch ${index + 1}/${batches.length}: Stored ${ + closures.length + objsToInsert.length + } objects in ${t1 - t0}ms.` ) + // logger.debug( `Batch ${index + 1}/${batches.length}: Stored ${closures.length + objsToInsert.length} objects in ${t1-t0}ms.` ) } const promises = batches.map((batch, index) => insertBatch(batch, index)) diff --git a/packages/server/modules/core/tests/branches.spec.js b/packages/server/modules/core/tests/branches.spec.js index d6ba97972..6c83e9a5b 100644 --- a/packages/server/modules/core/tests/branches.spec.js +++ b/packages/server/modules/core/tests/branches.spec.js @@ -47,7 +47,7 @@ describe('Branches @core-branches', () => { user.id = await createUser(user) stream.id = await createStream({ ...stream, ownerId: user.id }) - testObject.id = await createObject({ streamId: stream.id, object: testObject }) + testObject.id = await createObject(stream.id, testObject) }) const branch = { name: 'dim/dev' } diff --git a/packages/server/modules/core/tests/commits.spec.js b/packages/server/modules/core/tests/commits.spec.js index 168816498..1e7654ec7 100644 --- a/packages/server/modules/core/tests/commits.spec.js +++ b/packages/server/modules/core/tests/commits.spec.js @@ -50,7 +50,7 @@ describe('Commits @core-commits', () => { } const generateObject = async (streamId = stream.id, object = testObject) => - await createObject({ streamId, object }) + await createObject(streamId, object) const generateStream = async (streamBase = stream, ownerId = user.id) => await createStream({ ...streamBase, ownerId }) @@ -62,15 +62,9 @@ describe('Commits @core-commits', () => { user.id = await createUser(user) stream.id = await createStream({ ...stream, ownerId: user.id }) - const testObjectId = await createObject({ streamId: stream.id, object: testObject }) - const testObject2Id = await createObject({ - streamId: stream.id, - object: testObject2 - }) - const testObject3Id = await createObject({ - streamId: stream.id, - object: testObject3 - }) + const testObjectId = await createObject(stream.id, testObject) + const testObject2Id = await createObject(stream.id, testObject2) + const testObject3Id = await createObject(stream.id, testObject3) commitId1 = await createCommitByBranchName({ streamId: stream.id, @@ -218,7 +212,7 @@ describe('Commits @core-commits', () => { for (let i = 0; i < 10; i++) { const t = { qux: i } - t.id = await createObject({ streamId, object: t }) + t.id = await createObject(streamId, t) await createCommitByBranchName({ streamId, branchName: 'main', @@ -258,7 +252,7 @@ describe('Commits @core-commits', () => { for (let i = 0; i < 15; i++) { const t = { thud: i } - t.id = await createObject({ streamId, object: t }) + t.id = await createObject(streamId, t) await createCommitByBranchName({ streamId, branchName: 'dim/dev', diff --git a/packages/server/modules/core/tests/objects.spec.js b/packages/server/modules/core/tests/objects.spec.js index c663a873b..8149d9610 100644 --- a/packages/server/modules/core/tests/objects.spec.js +++ b/packages/server/modules/core/tests/objects.spec.js @@ -63,8 +63,8 @@ describe('Objects @core-objects', () => { }) it('Should create objects', async () => { - sampleObject.id = await createObject({ streamId: stream.id, object: sampleObject }) - sampleCommit.id = await createObject({ streamId: stream.id, object: sampleCommit }) + sampleObject.id = await createObject(stream.id, sampleObject) + sampleCommit.id = await createObject(stream.id, sampleCommit) }) const objCount_1 = 10 @@ -80,7 +80,7 @@ describe('Objects @core-objects', () => { }) } - const ids = await createObjects({ streamId: stream.id, objects: objs }) + const ids = await createObjects(stream.id, objs) expect(ids).to.have.lengthOf(objCount_1) }).timeout(30000) @@ -109,7 +109,7 @@ describe('Objects @core-objects', () => { }) } - const myIds = await createObjects({ streamId: stream.id, objects: objs2 }) + const myIds = await createObjects(stream.id, objs2) myIds.forEach((h, i) => (objs2[i].id = h)) @@ -127,7 +127,7 @@ describe('Objects @core-objects', () => { return obj }, {}) } - const id = await createObject({ streamId: stream.id, object: obj }) + const id = await createObject(stream.id, obj) expect(id).to.be.ok }) @@ -156,16 +156,16 @@ describe('Objects @core-objects', () => { it('Should get object children', async () => { const objs_1 = createManyObjects(100, 'noise__') - const ids = await createObjects({ streamId: stream.id, objects: objs_1 }) + const ids = await createObjects(stream.id, objs_1) // console.log( ids ) // console.log(ids[ 0 ]) // The below are just performance benchmarking. // let objs_2 = createManyObjects( 20000, 'noise_2' ) - // let ids2 = await createObjects( {streamId: stream.id, objects: objs_2} ) + // let ids2 = await createObjects( objs_2 ) // let objs_3 = createManyObjects( 100000, 'noise_3' ) - // let ids3 = await createObjects( {streamId: stream.id, objects: objs_3} ) + // let ids3 = await createObjects( objs_3 ) // let { rows } = await getObjectChildren( { objectId: ids[0], select: ['id', 'name', 'sortValueB'] } ) // let { rows } = await getObjectChildren( { objectId: ids[ 0 ] } ) @@ -494,7 +494,7 @@ describe('Objects @core-objects', () => { const objs = createManyObjects(3333, 'perlin merlin magic') commitId = objs[0].id - await createObjectsBatched({ streamId: stream.id, objects: objs }) + await createObjectsBatched(stream.id, objs) const parent = await getObject({ streamId: stream.id, objectId: commitId }) expect(parent.totalChildrenCount).to.equal(3333) @@ -539,10 +539,7 @@ describe('Objects @core-objects', () => { const promisses = [] for (let i = 0; i < shuffledVersions.length; i++) { - const promise = createObjectsBatched({ - streamId: stream.id, - objects: shuffledVersions[i] - }) + const promise = createObjectsBatched(stream.id, shuffledVersions[i]) promise.catch(() => {}) promisses.push(promise) } diff --git a/packages/server/modules/core/tests/streams.spec.ts b/packages/server/modules/core/tests/streams.spec.ts index 851510506..7681ebdd8 100644 --- a/packages/server/modules/core/tests/streams.spec.ts +++ b/packages/server/modules/core/tests/streams.spec.ts @@ -341,10 +341,7 @@ describe('Streams @core-streams', () => { it('Should update stream updatedAt on commit operations ', async () => { const testObject = { foo: 'bar', baz: 'qux', id: '' } - testObject.id = await createObject({ - streamId: updatableStream.id, - object: testObject - }) + testObject.id = await createObject(updatableStream.id, testObject) await createCommitByBranchName({ streamId: updatableStream.id, diff --git a/packages/server/modules/core/tests/users.spec.js b/packages/server/modules/core/tests/users.spec.js index 11285c7ed..67f4fb649 100644 --- a/packages/server/modules/core/tests/users.spec.js +++ b/packages/server/modules/core/tests/users.spec.js @@ -141,10 +141,7 @@ describe('Actors & Tokens @user-services', () => { }) // create an object and a commit around it on the multiowner stream - const objId = await createObject({ - streamId: multiOwnerStream.id, - object: { pie: 'in the sky' } - }) + const objId = await createObject(multiOwnerStream.id, { pie: 'in the sky' }) const commitId = await createCommitByBranchName({ streamId: multiOwnerStream.id, branchName: 'ballmer/dev', diff --git a/packages/server/modules/core/utils/formatting.ts b/packages/server/modules/core/utils/formatting.ts deleted file mode 100644 index ca563325a..000000000 --- a/packages/server/modules/core/utils/formatting.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const toMegabytesWith1DecimalPlace = (bytes: number) => - Math.round((bytes * 10) / 1024 / 1024) / 10 diff --git a/packages/server/modules/cross-server-sync/services/commit.ts b/packages/server/modules/cross-server-sync/services/commit.ts index 69e81cd37..9ee2717b3 100644 --- a/packages/server/modules/cross-server-sync/services/commit.ts +++ b/packages/server/modules/cross-server-sync/services/commit.ts @@ -495,13 +495,10 @@ const createNewObject = async ( return } - const newObjectId = await createObject({ - streamId: targetStreamId, - object: { - ...newObject, - id: newObject.id, - speckleType: newObject.speckleType || newObject.speckle_type || 'Base' - } + const newObjectId = await createObject(targetStreamId, { + ...newObject, + id: newObject.id, + speckleType: newObject.speckleType || newObject.speckle_type || 'Base' }) const newRecord = await getObject(newObjectId, targetStreamId) diff --git a/packages/server/modules/shared/helpers/envHelper.ts b/packages/server/modules/shared/helpers/envHelper.ts index bcb615d5d..2091f87df 100644 --- a/packages/server/modules/shared/helpers/envHelper.ts +++ b/packages/server/modules/shared/helpers/envHelper.ts @@ -405,7 +405,3 @@ export function postgresMaxConnections() { export function highFrequencyMetricsCollectionPeriodMs() { return getIntFromEnv('HIGH_FREQUENCY_METRICS_COLLECTION_PERIOD_MS', '100') } - -export function maximumObjectUploadFileSizeMb() { - return getIntFromEnv('MAX_OBJECT_UPLOAD_FILE_SIZE_MB', '50') -} diff --git a/packages/server/modules/stats/tests/stats.spec.js b/packages/server/modules/stats/tests/stats.spec.js index a47b4fd78..122ff1a50 100644 --- a/packages/server/modules/stats/tests/stats.spec.js +++ b/packages/server/modules/stats/tests/stats.spec.js @@ -230,10 +230,7 @@ async function seedDb({ const streamIds = await Promise.all(streamPromises) // create a objects - const objs = await createObjects({ - streamId: streamIds[0], - objects: createManyObjects(numObjects - 1) - }) + const objs = await createObjects(streamIds[0], createManyObjects(numObjects - 1)) // create commits referencing those objects const commitPromises = [] diff --git a/packages/server/package.json b/packages/server/package.json index c8aa9496d..d06382068 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -125,7 +125,6 @@ "@tiptap/core": "^2.0.0-beta.176", "@types/bcrypt": "^5.0.0", "@types/bull": "^3.15.9", - "@types/busboy": "^1.5.4", "@types/chai-as-promised": "^7.1.8", "@types/compression": "^1.7.2", "@types/connect-redis": "^0.0.23", diff --git a/packages/server/test/speckle-helpers/commitHelper.ts b/packages/server/test/speckle-helpers/commitHelper.ts index 7a6fd716c..f474574aa 100644 --- a/packages/server/test/speckle-helpers/commitHelper.ts +++ b/packages/server/test/speckle-helpers/commitHelper.ts @@ -36,7 +36,7 @@ export type BasicTestCommit = { } export async function createTestObject(params: { projectId: string }) { - return await createObject({ streamId: params.projectId, object: { foo: 'bar' } }) + return await createObject(params.projectId, { foo: 'bar' }) } /** @@ -46,9 +46,7 @@ async function ensureObjects(commits: BasicTestCommit[]) { const commitsWithoutObjects = commits.filter((c) => !c.objectId) await Promise.all( commitsWithoutObjects.map((c) => - createObject({ streamId: c.streamId, object: { foo: 'bar' } }).then( - (oid) => (c.objectId = oid) - ) + createObject(c.streamId, { foo: 'bar' }).then((oid) => (c.objectId = oid)) ) ) } diff --git a/utils/helm/speckle-server/templates/_helpers.tpl b/utils/helm/speckle-server/templates/_helpers.tpl index 0fa4a375b..6c5120a0d 100644 --- a/utils/helm/speckle-server/templates/_helpers.tpl +++ b/utils/helm/speckle-server/templates/_helpers.tpl @@ -594,9 +594,6 @@ Generate the environment variables for Speckle server and Speckle objects deploy - name: MAX_OBJECT_SIZE_MB value: {{ .Values.server.max_object_size_mb | quote }} -- name: MAX_OBJECT_UPLOAD_FILE_SIZE_MB - value: {{ .Values.server.max_object_upload_file_size_mb | quote }} - {{- if .Values.server.migration.movedFrom }} - name: MIGRATION_SERVER_MOVED_FROM value: {{ .Values.server.migration.movedFrom }} diff --git a/utils/helm/speckle-server/values.schema.json b/utils/helm/speckle-server/values.schema.json index 4d5312153..ff7a9659f 100644 --- a/utils/helm/speckle-server/values.schema.json +++ b/utils/helm/speckle-server/values.schema.json @@ -551,11 +551,6 @@ "description": "The maximum size of an individual object which can be uploaded to the server", "default": 10 }, - "max_object_upload_file_size_mb": { - "type": "number", - "description": "Objects are batched together and uploaded to the /objects endpoint as http POST form data. This determines the maximum size of that form data which can be uploaded to the server", - "default": 50 - }, "max_project_models_per_page": { "type": "number", "description": "The maximum number of models that can be returned in a single page of a query for all models of a project", diff --git a/utils/helm/speckle-server/values.yaml b/utils/helm/speckle-server/values.yaml index 67e2e720f..da5d45b40 100644 --- a/utils/helm/speckle-server/values.yaml +++ b/utils/helm/speckle-server/values.yaml @@ -423,8 +423,6 @@ server: ## @param server.max_object_size_mb The maximum size of an individual object which can be uploaded to the server max_object_size_mb: 10 - ## @param server.max_object_upload_file_size_mb Objects are batched together and uploaded to the /objects endpoint as http POST form data. This determines the maximum size of that form data which can be uploaded to the server - max_object_upload_file_size_mb: 50 ## @param server.max_project_models_per_page The maximum number of models that can be returned in a single page of a query for all models of a project max_project_models_per_page: 500 ## @param server.speckleAutomateUrl The url of the Speckle Automate instance diff --git a/yarn.lock b/yarn.lock index 4e0bd4534..b6e146fc0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15398,7 +15398,6 @@ __metadata: "@tiptap/core": "npm:^2.0.0-beta.176" "@types/bcrypt": "npm:^5.0.0" "@types/bull": "npm:^3.15.9" - "@types/busboy": "npm:^1.5.4" "@types/chai-as-promised": "npm:^7.1.8" "@types/compression": "npm:^1.7.2" "@types/connect-redis": "npm:^0.0.23" @@ -17891,15 +17890,6 @@ __metadata: languageName: node linkType: hard -"@types/busboy@npm:^1.5.4": - version: 1.5.4 - resolution: "@types/busboy@npm:1.5.4" - dependencies: - "@types/node": "npm:*" - checksum: 10/43cdd26754603fbee81f538ac52769f2cc8445d5f238666845d99a9fee22e0b608a075d0c346f78c43ade4ce4ec04433a51a1ffa21524ca29ead9d2375f4ec9c - languageName: node - linkType: hard - "@types/cacheable-request@npm:^6.0.1": version: 6.0.3 resolution: "@types/cacheable-request@npm:6.0.3"