Add streamId field to objects (#155)

This commit is contained in:
Cristian Balas
2021-03-25 21:58:46 +02:00
committed by GitHub
parent 27ee0f2d73
commit dfded037f4
11 changed files with 308 additions and 60 deletions
@@ -16,7 +16,8 @@ const {
module.exports = {
Stream: {
async object( parent, args, context, info ) {
let obj = await getObject( { objectId: args.id } )
let obj = await getObject( { streamId: parent.id, objectId: args.id } )
obj.streamId = parent.id
return obj
}
},
@@ -24,12 +25,14 @@ module.exports = {
async children( parent, args, context, info ) {
// The simple query branch
if ( !args.query && !args.orderBy ) {
let result = await getObjectChildren( { objectId: parent.id, limit: args.limit, depth: args.depth, select: args.select, cursor: args.cursor } )
let result = await getObjectChildren( { streamId: parent.streamId, objectId: parent.id, limit: args.limit, depth: args.depth, select: args.select, cursor: args.cursor } )
result.objects.forEach( x => x.streamId = parent.streamId )
return { totalCount: parent.totalChildrenCount, cursor: result.cursor, objects: result.objects }
}
// The complex query branch
let result = await getObjectChildrenQuery( { objectId: parent.id, limit: args.limit, depth: args.depth, select: args.select, query: args.query, orderBy: args.orderBy, cursor: args.cursor } )
let result = await getObjectChildrenQuery( { streamId: parent.streamId, objectId: parent.id, limit: args.limit, depth: args.depth, select: args.select, query: args.query, orderBy: args.orderBy, cursor: args.cursor } )
result.objects.forEach( x => x.streamId = parent.streamId )
return result
}
},
@@ -39,7 +42,7 @@ module.exports = {
await validateScopes( context.scopes, 'streams:write' )
await authorizeResolver( context.userId, args.objectInput.streamId, 'stream:contributor' )
let ids = await createObjects( args.objectInput.objects )
let ids = await createObjects( args.objectInput.streamId, args.objectInput.objects )
return ids
}
}
@@ -0,0 +1,194 @@
// /* istanbul ignore file */
/* Migration steps:
// Remove foreign key constraint in commits
// Add objects.streamId column, remove existing primary key constraint, add unique composite index for (streamId, id)
// Fix closure table (add streamId column that refers to both parent and child, recreate all indexes to include it)
// Create new objects and new closures (starting from commits)
// Delete objects and closures that don't have streamId
// Set streamId as notNullable in closures
// delete composite index (streamId, id) and Create composite primary key on the same fields (unique index was used as a workaround bc we can't have composite PK with null values)
*/
exports.up = async ( knex ) => {
await knex.schema.alterTable( 'commits', table => {
table.dropForeign( 'referencedObject' )
} )
await knex.schema.alterTable( 'objects', table => {
table.string( 'streamId', 10 ).references( 'id' ).inTable( 'streams' ).onDelete( 'cascade' )
table.dropPrimary()
table.unique( [ 'streamId', 'id' ] )
table.index( 'id' )
table.index( 'streamId' )
} )
await knex.schema.alterTable( 'object_children_closure', table => {
/* Created with:
table.string( 'parent' ).notNullable( ).index( )
table.string( 'child' ).notNullable( ).index( )
table.integer( 'minDepth' ).defaultTo( 1 ).notNullable( ).index( )
table.unique( [ 'parent', 'child' ], 'obj_parent_child_index' )
table.index( [ 'parent', 'minDepth' ], 'full_pcd_index' )
*/
table.dropIndex( 'parent' )
table.dropIndex( 'child' )
table.dropIndex( 'minDepth' )
table.dropUnique( [ 'parent', 'child' ], 'obj_parent_child_index' )
table.dropIndex( [ 'parent', 'minDepth' ], 'full_pcd_index' )
table.string( 'streamId', 10 ).references( 'id' ).inTable( 'streams' ).onDelete( 'cascade' )
table.index( [ 'streamId', 'parent' ] )
table.index( [ 'streamId', 'child' ] )
table.index( [ 'streamId', 'minDepth' ] )
table.unique( [ 'streamId', 'parent', 'child' ], 'obj_parent_child_index' )
table.index( [ 'streamId', 'parent', 'minDepth' ], 'full_pcd_index' )
} )
await knex.raw( `
INSERT INTO objects
(
"streamId",
"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", "data"
)
SELECT
stream_commits."streamId",
O."id", O."speckleType", O."totalChildrenCount", O."totalChildrenCountByDepth", O."createdAt", O."data"
FROM
commits
INNER JOIN stream_commits on "id" = "commitId"
INNER JOIN objects O on "referencedObject" = O."id"
ON CONFLICT DO NOTHING
` )
await knex.raw( `
INSERT INTO objects
(
"streamId",
"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", "data"
)
SELECT
stream_commits."streamId",
O."id", O."speckleType", O."totalChildrenCount", O."totalChildrenCountByDepth", O."createdAt", O."data"
FROM
commits
INNER JOIN stream_commits ON "id" = "commitId"
INNER JOIN object_children_closure ON commits."referencedObject" = object_children_closure."parent"
INNER JOIN objects O on object_children_closure."child" = O."id"
ON CONFLICT DO NOTHING
` )
await knex.raw( `
INSERT INTO object_children_closure
(
"streamId", "parent", "child", "minDepth"
)
SELECT
O."streamId",
C."parent", C."child", C."minDepth"
FROM
object_children_closure C
INNER JOIN objects O ON "parent" = "id"
WHERE O."streamId" IS NOT NULL
` )
await knex.raw( `
DELETE FROM object_children_closure WHERE "streamId" IS NULL
` )
await knex.raw( `
DELETE FROM objects WHERE "streamId" IS NULL
` )
await knex.raw( `
ALTER TABLE object_children_closure ALTER COLUMN "streamId" SET NOT NULL;
` )
await knex.schema.alterTable( 'objects', table => {
table.dropUnique( [ 'streamId', 'id' ] )
table.primary( [ 'streamId', 'id' ] )
} )
}
/*
Revert data and schema
*/
exports.down = async ( knex ) => {
let hasColumn = await knex.schema.hasColumn( 'objects', 'streamId' )
if ( hasColumn ) {
await knex.schema.alterTable( 'objects', table => {
table.dropPrimary()
table.dropForeign( 'streamId' )
} )
await knex.raw( `
ALTER TABLE objects ALTER COLUMN "streamId" DROP NOT NULL;
` )
await knex.raw( `
ALTER TABLE object_children_closure ALTER COLUMN "streamId" DROP NOT NULL;
` )
await knex.raw( `
CREATE UNIQUE INDEX "tmp_uniqueid_for_null_stm_idx" ON objects ("id") WHERE "streamId" IS NULL
` )
await knex.raw( `
INSERT INTO objects
(
"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", "data"
)
SELECT "id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", "data"
FROM
objects
ON CONFLICT DO NOTHING
` )
await knex.raw( `
CREATE UNIQUE INDEX "tmp_unique_pc_for_no_stream_idx" ON object_children_closure ("parent", "child") WHERE "streamId" IS NULL;
` )
await knex.raw( `
INSERT INTO object_children_closure ("parent", "child", "minDepth")
SELECT "parent", "child", "minDepth" FROM object_children_closure
ON CONFLICT DO NOTHING
` )
await knex.raw( `
DROP INDEX "tmp_uniqueid_for_null_stm_idx";
` )
await knex.raw( `
DROP INDEX "tmp_unique_pc_for_no_stream_idx";
` )
await knex.raw( `
DELETE FROM object_children_closure WHERE "streamId" IS NOT NULL
` )
await knex.raw( `
DELETE FROM objects WHERE "streamId" IS NOT NULL
` )
await knex.schema.alterTable( 'object_children_closure', table => {
table.dropIndex( [ 'streamId', 'parent' ] )
table.dropIndex( [ 'streamId', 'child' ] )
table.dropIndex( [ 'streamId', 'minDepth' ] )
table.dropUnique( [ 'streamId', 'parent', 'child' ], 'obj_parent_child_index' )
table.dropIndex( [ 'streamId', 'parent', 'minDepth' ], 'full_pcd_index' )
table.dropColumn( 'streamId' )
table.index( 'parent' )
table.index( 'child' )
table.index( 'minDepth' )
table.unique( [ 'parent', 'child' ], 'obj_parent_child_index' )
table.index( [ 'parent', 'minDepth' ], 'full_pcd_index' )
} )
await knex.schema.alterTable( 'objects', table => {
table.dropIndex( 'id' )
table.dropColumn( 'streamId' )
table.primary( 'id' )
} )
await knex.schema.alterTable( 'commits', table => {
table.foreign( 'referencedObject' ).references( 'id' ).inTable( 'objects' )
} )
}
}
@@ -41,7 +41,7 @@ module.exports = ( app ) => {
}
// Populate first object (the "commit")
let obj = await getObject( { objectId: req.params.objectId } )
let obj = await getObject( { streamId: req.params.streamId, objectId: req.params.objectId } )
if ( !obj ) {
return res.status( 404 ).send( `Failed to find object ${req.params.objectId}.` )
@@ -51,7 +51,7 @@ module.exports = ( app ) => {
let simpleText = req.headers.accept === 'text/plain'
let dbStream = await getObjectChildrenStream( { objectId: req.params.objectId } )
let dbStream = await getObjectChildrenStream( { streamId: req.params.streamId, objectId: req.params.objectId } )
let currentChunkSize = 0
let maxChunkSize = 50000
@@ -106,14 +106,15 @@ module.exports = ( app ) => {
k++
} catch ( e ) {
requestDropped = true
res.status( 400 ).send( 'Failed to find object, or object is corrupted.' )
debug( 'speckle:error' )( `'Failed to find object, or object is corrupted.' ${req.params.objectId}` )
return
}
} )
dbStream.on( 'error', err => {
debug( 'speckle:error' )( `Error in streaming object children for ${req.params.objectId}` )
debug( 'speckle:error' )( `Error in streaming object children for ${req.params.objectId}: ${err}` )
requestDropped = true
res.status( 400 ).send( 'Failed to find object, or object is corrupted.' )
return
} )
dbStream.on( 'end', ( ) => {
@@ -132,7 +133,7 @@ module.exports = ( app ) => {
app.get( '/objects/:streamId/:objectId/single', async ( req, res ) => {
// TODO: authN & authZ checks
let obj = await getObject( req.params.objectId )
let obj = await getObject( req.params.streamId, req.params.objectId )
res.send( obj )
} )
+2 -2
View File
@@ -64,7 +64,7 @@ module.exports = ( app ) => {
last = objs[ objs.length - 1 ]
totalProcessed += objs.length
let promise = createObjectsBatched( objs )
let promise = createObjectsBatched( req.params.streamId, objs )
promises.push( promise )
await promise
@@ -88,7 +88,7 @@ module.exports = ( app ) => {
last = objs[ objs.length - 1 ]
totalProcessed += objs.length
let promise = createObjectsBatched( objs )
let promise = createObjectsBatched( req.params.streamId, objs )
promises.push( promise )
await promise
@@ -20,7 +20,7 @@ module.exports = {
// If no total children count is passed in, get it from the original object
// that this commit references.
if ( !totalChildrenCount ){
let { totalChildrenCount: tc } = await getObject( {objectId} )
let { totalChildrenCount: tc } = await getObject( { streamId, objectId } )
totalChildrenCount = tc || 1
}
@@ -19,14 +19,14 @@ const StreamCommits = ( ) => knex( 'stream_commits' )
module.exports = {
async createObject( object ) {
let insertionObject = prepInsertionObject( object )
async createObject( streamId, object ) {
let insertionObject = prepInsertionObject( streamId, object )
let closures = [ ]
let totalChildrenCountByDepth = {}
if ( object.__closure !== null ) {
for ( const prop in object.__closure ) {
closures.push( { parent: insertionObject.id, child: prop, minDepth: object.__closure[ prop ] } )
closures.push( { streamId: streamId, parent: insertionObject.id, child: prop, minDepth: object.__closure[ prop ] } )
if ( totalChildrenCountByDepth[ object.__closure[ prop ].toString( ) ] )
totalChildrenCountByDepth[ object.__closure[ prop ].toString( ) ]++
@@ -52,20 +52,20 @@ module.exports = {
return insertionObject.id
},
async createObjectsBatched( objects ) {
async createObjectsBatched( streamId, objects ) {
let closures = [ ]
let objsToInsert = [ ]
let ids = [ ]
// Prep objects up
objects.forEach( obj => {
let insertionObject = prepInsertionObject( obj )
let insertionObject = prepInsertionObject( streamId, obj )
let totalChildrenCountGlobal = 0
let totalChildrenCountByDepth = {}
if ( obj.__closure !== null ) {
for ( const prop in obj.__closure ) {
closures.push( { parent: insertionObject.id, child: prop, minDepth: obj.__closure[ prop ] } )
closures.push( { streamId: streamId, parent: insertionObject.id, child: prop, minDepth: obj.__closure[ prop ] } )
totalChildrenCountGlobal++
if ( totalChildrenCountByDepth[ obj.__closure[ prop ].toString( ) ] )
totalChildrenCountByDepth[ obj.__closure[ prop ].toString( ) ]++
@@ -116,7 +116,7 @@ module.exports = {
return true
},
async createObjects( objects ) {
async createObjects( streamId, objects ) {
// TODO: Switch to knex batch inserting functionality
// see http://knexjs.org/#Utility-BatchInsert
let batches = [ ]
@@ -138,12 +138,12 @@ module.exports = {
let t0 = performance.now( )
batch.forEach( obj => {
let insertionObject = prepInsertionObject( obj )
let insertionObject = prepInsertionObject( streamId, obj )
let totalChildrenCountByDepth = {}
let totalChildrenCountGlobal = 0
if ( obj.__closure !== null ) {
for ( const prop in obj.__closure ) {
closures.push( { parent: insertionObject.id, child: prop, minDepth: obj.__closure[ prop ] } )
closures.push( { streamId: streamId, parent: insertionObject.id, child: prop, minDepth: obj.__closure[ prop ] } )
totalChildrenCountGlobal++
@@ -163,7 +163,7 @@ module.exports = {
objsToInsert.push( insertionObject )
ids.push( insertionObject.id )
} )
if ( objsToInsert.length > 0 ) {
let queryObjs = Objects( ).insert( objsToInsert ).toString( ) + ' on conflict do nothing'
await knex.raw( queryObjs )
@@ -185,24 +185,28 @@ module.exports = {
return ids
},
async getObject( { objectId } ) {
let res = await Objects( ).where( { id: objectId } ).select( '*' ).first( )
async getObject( { streamId, objectId } ) {
let res = await Objects( ).where( { streamId: streamId, id: objectId } ).select( '*' ).first( )
if ( !res ) return null
res.data.totalChildrenCount = res.totalChildrenCount // move this back
delete res.streamId // backwards compatibility
return res
},
async getObjectChildrenStream( { objectId } ) {
async getObjectChildrenStream( { streamId, objectId } ) {
let q = Closures( )
q.select( 'id' )
q.select( 'data' )
q.rightJoin( 'objects', 'objects.id', 'object_children_closure.child' )
.where( knex.raw( 'parent = ?', [ objectId ] ) )
q.rightJoin( 'objects', function() {
this.on( 'objects.streamId', '=', 'object_children_closure.streamId' )
.andOn( 'objects.id', '=', 'object_children_closure.child' )
} )
.where( knex.raw( 'object_children_closure."streamId" = ? AND parent = ?', [ streamId, objectId ] ) )
.orderBy( 'objects.id' )
return q.stream( )
},
async getObjectChildren( { objectId, limit, depth, select, cursor } ) {
async getObjectChildren( { streamId, objectId, limit, depth, select, cursor } ) {
limit = parseInt( limit ) || 50
depth = parseInt( depth ) || 1000
@@ -224,8 +228,11 @@ module.exports = {
q.select( 'data' )
}
q.rightJoin( 'objects', 'objects.id', 'object_children_closure.child' )
.where( knex.raw( 'parent = ?', [ objectId ] ) )
q.rightJoin( 'objects', function() {
this.on( 'objects.streamId', '=', 'object_children_closure.streamId' )
.andOn( 'objects.id', '=', 'object_children_closure.child' )
} )
.where( knex.raw( 'object_children_closure."streamId" = ? AND parent = ?', [ streamId, objectId ] ) )
.andWhere( knex.raw( '"minDepth" < ?', [ depth ] ) )
.andWhere( knex.raw( 'id > ?', [ cursor ? cursor : '0' ] ) )
.orderBy( 'objects.id' )
@@ -253,7 +260,7 @@ module.exports = {
// This query is inefficient on larger sets (n * 10k objects) as we need to return the total count on an arbitrarily (user) defined selection of objects.
// A possible future optimisation route would be to cache the total count of a query (as objects are immutable, it will not change) on a first run, and, if found on a subsequent round, do a simpler query and merge the total count result.
async getObjectChildrenQuery( { objectId, limit, depth, select, cursor, query, orderBy } ) {
async getObjectChildrenQuery( { streamId, objectId, limit, depth, select, cursor, query, orderBy } ) {
limit = parseInt( limit ) || 50
depth = parseInt( depth ) || 1000
orderBy = orderBy || { field: 'id', direction: 'asc' }
@@ -298,8 +305,12 @@ module.exports = {
}
// join on objects table
cteInnerQuery.join( 'objects', 'child', '=', 'objects.id' )
.where( 'parent', objectId )
cteInnerQuery.join( 'objects', function() {
this.on( 'objects.streamId', '=', 'object_children_closure.streamId' )
.andOn( 'objects.id', '=', 'object_children_closure.child' )
} )
.where( 'object_children_closure.streamId', streamId )
.andWhere( 'parent', objectId )
.andWhere( 'minDepth', '<', depth )
// Add user provided filters/queries.
@@ -417,8 +428,11 @@ module.exports = {
return { totalCount, objects: rows, cursor: rows.length === limit ? cursorEncoded : null }
},
async getObjects( objectIds ) {
let res = await Objects( ).whereIn( 'id', objectIds ).select( '*' )
async getObjects( streamId, objectIds ) {
let res = await Objects( )
.whereIn( 'id', objectIds )
.andWhere( 'streamId', streamId )
.select( 'id', 'speckleType', 'totalChildrenCount', 'totalChildrenCountByDepth', 'createdAt', 'data' )
return res
},
@@ -431,7 +445,7 @@ module.exports = {
// Note: we're generating the hash here, rather than on the db side, as there are
// limitations when doing upserts - ignored fields are not always returned, hence
// we cannot provide a full response back including all object hashes.
function prepInsertionObject( obj ) {
function prepInsertionObject( streamId, obj ) {
let memNow = process.memoryUsage( ).heapUsed / 1024 / 1024
if ( obj.hash )
@@ -444,6 +458,7 @@ function prepInsertionObject( obj ) {
return {
data: stringifiedObj, // stored in jsonb column
streamId: streamId,
id: obj.id,
speckleType: obj.speckleType
}
@@ -49,7 +49,7 @@ describe( 'Branches @core-branches', ( ) => {
user.id = await createUser( user )
stream.id = await createStream( { ...stream, ownerId: user.id } )
testObject.id = await createObject( testObject )
testObject.id = await createObject( stream.id, testObject )
} )
after( async ( ) => {
@@ -68,9 +68,9 @@ describe( 'Commits @core-commits', ( ) => {
user.id = await createUser( user )
stream.id = await createStream( { ...stream, ownerId: user.id } )
testObject.id = await createObject( testObject )
testObject2.id = await createObject( testObject2 )
testObject3.id = await createObject( testObject3 )
testObject.id = await createObject( stream.id, testObject )
testObject2.id = await createObject( stream.id, testObject2 )
testObject3.id = await createObject( stream.id, testObject3 )
} )
after( async ( ) => {
@@ -115,7 +115,7 @@ describe( 'Commits @core-commits', ( ) => {
it( 'Should get the commits from a branch', async ( ) => {
for ( let i = 0; i < 10; i++ ) {
let t = { qux: i }
t.id = await createObject( t )
t.id = await createObject( stream.id, t )
await createCommitByBranchName( { streamId: stream.id, branchName: 'main', message: `commit # ${i+3}`, sourceApplication: 'tests', objectId: t.id, authorId: user.id } )
}
@@ -138,7 +138,7 @@ describe( 'Commits @core-commits', ( ) => {
let prevId
for ( let i = 0; i < 10; i++ ) {
let t = { thud: i }
t.id = await createObject( t )
t.id = await createObject( stream.id, t )
await createCommitByBranchName( { streamId: stream.id, branchName: 'dim/dev', message: `pushed something # ${i+3}`, sourceApplication: 'tests', objectId: t.id, authorId: user.id } )
}
@@ -165,7 +165,8 @@ describe( 'Commits @core-commits', ( ) => {
it( 'Should get the public commits of an user only', async ( ) => {
let privateStreamId = await createStream( { name: 'private', isPublic: false, ownerId: user.id } )
let commitId = await createCommitByBranchName( { streamId: privateStreamId, branchName: 'main', message: 'first commit', sourceApplication: 'tests', objectId: testObject.id, authorId: user.id } )
let objectId = await createObject( privateStreamId, testObject )
let commitId = await createCommitByBranchName( { streamId: privateStreamId, branchName: 'main', message: 'first commit', sourceApplication: 'tests', objectId, authorId: user.id } )
let { commits, cursor } = await getCommitsByUserId( { userId: user.id, limit: 1000 } )
expect( commits.length ).to.equal( 23 )
@@ -75,8 +75,8 @@ describe( 'Objects @core-objects', ( ) => {
} )
it( 'Should create objects', async ( ) => {
sampleObject.id = await createObject( sampleObject )
sampleCommit.id = await createObject( sampleCommit )
sampleObject.id = await createObject( stream.id, sampleObject )
sampleCommit.id = await createObject( stream.id, sampleCommit )
} )
let objCount_1 = 10
@@ -92,7 +92,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
}
let ids = await createObjects( objs )
let ids = await createObjects( stream.id, objs )
expect( ids ).to.have.lengthOf( objCount_1 )
@@ -116,7 +116,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
}
let myIds = await createObjects( objs2 )
let myIds = await createObjects( stream.id, objs2 )
myIds.forEach( ( h, i ) => objs2[ i ].id = h )
@@ -125,12 +125,12 @@ describe( 'Objects @core-objects', ( ) => {
} ).timeout( 30000 )
it( 'Should get a single object', async ( ) => {
let obj = await getObject( { objectId: sampleCommit.id } )
let obj = await getObject( { streamId: stream.id, objectId: sampleCommit.id } )
expect( obj ).to.not.be.null
} )
it( 'Should get more objects', async ( ) => {
let myObjs = await getObjects( objs.map( o => o.id ) )
let myObjs = await getObjects( stream.id, objs.map( o => o.id ) )
expect( myObjs ).to.have.lengthOf( objs.length )
let match1 = myObjs.find( o => o.id === objs[ 0 ].id )
@@ -147,7 +147,7 @@ describe( 'Objects @core-objects', ( ) => {
it( 'Should get object children', async ( ) => {
let objs_1 = createManyObjects( 100, 'noise__' )
let ids = await createObjects( objs_1 )
let ids = await createObjects( stream.id, objs_1 )
// console.log( ids )
// console.log(ids[ 0 ])
@@ -162,7 +162,7 @@ describe( 'Objects @core-objects', ( ) => {
// let { rows } = await getObjectChildren( { objectId: ids[ 0 ] } )
let limit = 50
let { objects: rows_1, cursor: cursor_1 } = await getObjectChildren( { limit, objectId: ids[ 0 ], select: [ 'nest.mallard', 'test.value', 'test.secondValue', 'nest.arr[0]', 'nest.arr[1]' ] } )
let { objects: rows_1, cursor: cursor_1 } = await getObjectChildren( { streamId: stream.id, limit, objectId: ids[ 0 ], select: [ 'nest.mallard', 'test.value', 'test.secondValue', 'nest.arr[0]', 'nest.arr[1]' ] } )
expect( rows_1.length ).to.equal( limit )
expect( rows_1[ 0 ] ).to.be.an( 'object' )
@@ -172,7 +172,7 @@ describe( 'Objects @core-objects', ( ) => {
expect( cursor_1 ).to.be.a( 'string' )
let { objects: rows_2, cursor: cursor_2 } = await getObjectChildren( { limit, objectId: ids[ 0 ], select: [ 'nest.mallard', 'test.value', 'test.secondValue', 'nest.arr[0]', 'nest.arr[1]' ], cursor: cursor_1 } )
let { objects: rows_2, cursor: cursor_2 } = await getObjectChildren( { streamId: stream.id, limit, objectId: ids[ 0 ], select: [ 'nest.mallard', 'test.value', 'test.secondValue', 'nest.arr[0]', 'nest.arr[1]' ], cursor: cursor_1 } )
expect( rows_2.length ).to.equal( 50 )
expect( rows_2[ 0 ] ).to.be.an( 'object' )
@@ -181,7 +181,7 @@ describe( 'Objects @core-objects', ( ) => {
expect( rows_2[ 0 ] ).to.have.nested.property( 'data.nest.mallard' )
let { objects, cursor } = await getObjectChildren( { objectId: ids[ 0 ], limit: 1000 } )
let { objects, cursor } = await getObjectChildren( { streamId: stream.id, objectId: ids[ 0 ], limit: 1000 } )
expect( objects.length ).to.equal( 100 )
parentObjectId = ids[ 0 ]
@@ -192,6 +192,7 @@ describe( 'Objects @core-objects', ( ) => {
// we're assuming the prev test objects exist
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
select: [ 'id', 'test.value' ],
limit: 3,
@@ -200,6 +201,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
let test2 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
select: [ 'id', 'test.value', 'nest.duck' ],
limit: 40,
@@ -237,6 +239,7 @@ describe( 'Objects @core-objects', ( ) => {
// Note: the `similar` field is incremented on i%3===0, resulting in a pattern of 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, etc.
let test3 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
// select: [ 'similar', 'id' ],
query: [ { field: 'similar', operator: '>=', value: 0 }, { field: 'similar', operator: '<', value: 100 } ],
@@ -245,6 +248,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
let test4 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
// select: [ 'similar', 'id' ],
query: [ { field: 'similar', operator: '>=', value: 0 }, { field: 'similar', operator: '<', value: 100 } ],
@@ -276,6 +280,7 @@ describe( 'Objects @core-objects', ( ) => {
it( 'should query object children with no results ', async ( ) => {
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
query: [ { field: 'test.value', operator: '>=', value: 10 }, { field: 'test.value', operator: '<', value: 9 } ],
orderBy: { field: 'test.value', direction: 'desc' }
@@ -288,6 +293,7 @@ describe( 'Objects @core-objects', ( ) => {
it( 'should not allow invalid query operators ', async ( ) => {
try {
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
query: [ { field: 'test.value', operator: '> 0; BOBBY DROPPPPED MY TABLES; -- and the bass?', value: 10 }, { field: 'test.value', operator: '<', value: 9 } ],
orderBy: { field: 'test.value', direction: 'desc' }
@@ -301,6 +307,7 @@ describe( 'Objects @core-objects', ( ) => {
it( 'should query childern and sort them by a boolean value ', async ( ) => {
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 5,
select: [ 'test.value', 'nest.duck' ],
@@ -309,6 +316,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
let test2 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 5,
select: [ 'test.value', 'nest.duck' ],
@@ -326,6 +334,7 @@ describe( 'Objects @core-objects', ( ) => {
let limVal = 20
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 5,
query: [ { field: 'test.value', operator: '<', value: limVal } ],
@@ -333,6 +342,7 @@ describe( 'Objects @core-objects', ( ) => {
} )
let test2 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 5,
query: [ { field: 'test.value', operator: '<', value: limVal } ],
@@ -353,6 +363,7 @@ describe( 'Objects @core-objects', ( ) => {
it( 'should query childern and sort them by id by default ', async ( ) => {
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 3,
query: [ { field: 'test.value', operator: '>=', value: 10 }, { field: 'test.value', operator: '<', value: 100 } ],
@@ -361,6 +372,7 @@ describe( 'Objects @core-objects', ( ) => {
expect( test.totalCount ).to.equal( 90 )
let test2 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 3,
query: [ { field: 'test.value', operator: '>=', value: 10 }, { field: 'test.value', operator: '<', value: 100 } ],
@@ -373,12 +385,14 @@ describe( 'Objects @core-objects', ( ) => {
it( 'should just order results by something', async ( ) => {
let test = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 2,
orderBy: { field: 'test.value', direction: 'desc' }
} )
let test2 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 2,
orderBy: { field: 'test.value', direction: 'desc' },
@@ -388,12 +402,14 @@ describe( 'Objects @core-objects', ( ) => {
expect( test.objects[ 1 ].data.test.value ).to.equal( test2.objects[ 0 ].data.test.value + 1 ) // continuity check
let test3 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 50,
orderBy: { field: 'nest.duck', direction: 'desc' }
} )
let test4 = await getObjectChildrenQuery( {
streamId: stream.id,
objectId: parentObjectId,
limit: 50,
orderBy: { field: 'nest.duck', direction: 'desc' },
@@ -410,17 +426,17 @@ describe( 'Objects @core-objects', ( ) => {
let objs = createManyObjects( 3333, 'perlin merlin magic' )
commitId = objs[ 0 ].id
await createObjectsBatched( objs )
await createObjectsBatched( stream.id, objs )
let parent = await getObject( { objectId: commitId } )
let parent = await getObject( { streamId: stream.id, objectId: commitId } )
expect( parent.totalChildrenCount ).to.equal( 3333 )
let commitChildren = await getObjectChildren( { objectId: commitId, limit: 2 } )
let commitChildren = await getObjectChildren( { streamId: stream.id, objectId: commitId, limit: 2 } )
expect( commitChildren.objects.length ).to.equal( 2 )
} )
it( 'should stream objects back', ( done ) => {
let tcount = 0
getObjectChildrenStream( { objectId: commitId } )
getObjectChildrenStream( { streamId: stream.id, objectId: commitId } )
.then( stream => {
stream.on( 'data', row => tcount++ )
stream.on( 'end', ( ) => {
@@ -450,7 +466,7 @@ describe( 'Objects @core-objects', ( ) => {
let promisses = []
for ( let i = 0; i < shuffledVersions.length; i++ ) {
let promise = createObjectsBatched( shuffledVersions[i] )
let promise = createObjectsBatched( stream.id, shuffledVersions[i] )
promise.catch( ( e ) => { } )
promisses.push( promise )
}
@@ -92,6 +92,24 @@ describe( 'Upload/Download Routes @api-rest', ( ) => {
} )
it( 'Should not allow getting an object that is not part of the stream', async ( ) => {
let objBatch = createManyObjects( 20 )
await request( expressApp )
.post( `/objects/${privateTestStream.id}` )
.set( 'Authorization', userA.token )
.set( 'Content-type', 'multipart/form-data' )
.attach( 'batch1', Buffer.from( JSON.stringify( objBatch ), 'utf8' ) )
// should allow userA to access privateTestStream object
res = await chai.request( expressApp ).get( `/objects/${privateTestStream.id}/${objBatch[0].id}` ).set( 'Authorization', userA.token )
expect( res ).to.have.status( 200 )
// should not allow userB to access privateTestStream object by pretending it's in public stream
res = await chai.request( expressApp ).get( `/objects/${testStream.id}/${objBatch[0].id}` ).set( 'Authorization', userB.token )
expect( res ).to.have.status( 404 )
} )
it( 'Should not allow upload requests without an authorization token or valid streamId', async ( ) => {
// invalid token and streamId
let res = await chai.request( expressApp ).post( '/objects/wow_hack' ).set( 'Authorization', 'this is a hoax' )
@@ -194,7 +194,7 @@ describe( 'Streams @core-streams', ( ) => {
await sleep( 100 )
let testObject = { foo: 'bar', baz: 'qux' }
testObject.id = await createObject( testObject )
testObject.id = await createObject( s.id, testObject )
commitId1 = await createCommitByBranchName( { streamId: s.id, branchName: 'main', message: 'first commit', objectId: testObject.id, authorId: userOne.id, sourceApplication: 'tests' } )
su = await getStream( { streamId: s.id } )