fix(subscriptions): added stream deleted subscription

This commit is contained in:
Dimitrie Stefanescu
2020-09-01 10:58:29 +01:00
parent 1b57a2ba4b
commit 11571a1d76
5 changed files with 197 additions and 134 deletions
+33 -14
View File
@@ -24,6 +24,12 @@ const STREAM_DELETED = 'STREAM_DELETED'
const STREAM_PERMISSION_GRANTED = 'STREAM_PERMISSION_GRANTED'
const STREAM_PERMISSION_REVOKED = 'STREAM_PERMISSION_REVOKED'
function sleep( ms ) {
return new Promise( ( resolve ) => {
setTimeout( resolve, ms )
} )
}
module.exports = {
Query: {
@@ -90,15 +96,18 @@ module.exports = {
async streamDelete( parent, args, context, info ) {
await authorizeResolver( context.userId, args.id, 'stream:owner' )
// TODO: Notify all stream users
// Notify any listeners on the streamId
await pubsub.publish( STREAM_DELETED, { streamDeleted: { streamId: args.id }, streamId: args.id } )
// Notify all stream users
let users = await getStreamUsers( { streamId: args.id } )
for ( let user of users ) {
await pubsub.publish( USER_STREAM_DELETED, { userStreamDeleted: { streamId: args.id }, ownerId: user.id } )
}
// TODO: Notify any listeners on the streamId
await pubsub.publish( STREAM_DELETED, { streamDeleted: args.id, streamId: args.id } )
// delay deletion by a bit so we can do auth checks
await sleep( 250 )
// Delete after event so we can do authz
await deleteStream( { streamId: args.id } )
@@ -115,7 +124,9 @@ module.exports = {
if ( granted ) {
await pubsub.publish( STREAM_PERMISSION_GRANTED, {
streamPermissionGranted: { ...params, grantor: context.userId }, userId: params.userId, streamId: params.streamId
streamPermissionGranted: { ...params, grantor: context.userId },
userId: params.userId,
streamId: params.streamId
} )
}
@@ -128,7 +139,9 @@ module.exports = {
if ( revoked ) {
await pubsub.publish( STREAM_PERMISSION_REVOKED, {
streamPermissionRevoked: { ...args.permissionParams }, userId: args.permissionParams.userId, streamId: args.permissionParams.streamId
streamPermissionRevoked: { ...args.permissionParams },
userId: args.permissionParams.userId,
streamId: args.permissionParams.streamId
} )
}
@@ -139,7 +152,14 @@ module.exports = {
Subscription: {
userStreamCreated: {
subscribe: withFilter( () => pubsub.asyncIterator( [ USER_STREAM_CREATED ] ),
subscribe: withFilter( ( ) => pubsub.asyncIterator( [ USER_STREAM_CREATED ] ),
( payload, variables, context ) => {
return payload.ownerId === context.userId
} )
},
userStreamDeleted: {
subscribe: withFilter( ( ) => pubsub.asyncIterator( [ USER_STREAM_DELETED ] ),
( payload, variables, context ) => {
return payload.ownerId === context.userId
} )
@@ -150,31 +170,30 @@ module.exports = {
( ) => pubsub.asyncIterator( [ STREAM_UPDATED ] ),
async ( payload, variables, context ) => {
await authorizeResolver( context.userId, payload.streamId, 'stream:reviewer' )
return payload.streamId === variables.streamId
} )
},
userStreamDeleted: {
subscribe: withFilter( () => pubsub.asyncIterator( [ USER_STREAM_DELETED ] ),
( payload, variables, context ) => {
return payload.ownerId === context.userId
streamDeleted: {
subscribe: withFilter( ( ) => pubsub.asyncIterator( [ STREAM_DELETED ] ),
async ( payload, variables, context ) => {
await authorizeResolver( context.userId, payload.streamId, 'stream:reviewer' )
return payload.streamId === variables.streamId
} )
},
streamPermissionGranted: {
subscribe: withFilter( () => pubsub.asyncIterator( [ STREAM_PERMISSION_GRANTED ] ),
subscribe: withFilter( ( ) => pubsub.asyncIterator( [ STREAM_PERMISSION_GRANTED ] ),
( payload, variables ) => {
return payload.userId === variables.userId
} )
},
streamPermissionRevoked: {
subscribe: withFilter( () => pubsub.asyncIterator( [ STREAM_PERMISSION_REVOKED ] ),
subscribe: withFilter( ( ) => pubsub.asyncIterator( [ STREAM_PERMISSION_REVOKED ] ),
( payload, variables ) => {
return payload.userId === variables.userId
} )
}
}
}
+26 -23
View File
@@ -43,31 +43,31 @@ extend type Mutation {
Creates a new stream.
"""
streamCreate( stream: StreamCreateInput! ): String
@hasRole(role: "server:user")
@hasRole(role: "server:user")
@hasScope(scope: "streams:write")
"""
Updates an existing stream.
"""
streamUpdate( stream: StreamUpdateInput! ): Boolean!
@hasRole(role: "server:user")
@hasRole(role: "server:user")
@hasScope(scope: "streams:write")
"""
Deletes an existing stream.
"""
streamDelete( id: String! ): Boolean!
@hasRole(role: "server:user")
@hasRole(role: "server:user")
@hasScope(scope: "streams:write")
"""
Grants permissions to a user on a given stream.
"""
streamGrantPermission( permissionParams: StreamGrantPermissionInput! ): Boolean
@hasRole(role: "server:user")
@hasRole(role: "server:user")
@hasScope(scope: "streams:write")
"""
Revokes the permissions of a user on a given stream.
"""
streamRevokePermission( permissionParams: StreamRevokePermissionInput! ): Boolean
@hasRole(role: "server:user")
@hasRole(role: "server:user")
@hasScope(scope: "streams:write")
}
@@ -75,48 +75,51 @@ extend type Subscription {
#
# User bound subscriptions that operate on the stream collection of an user
# Example relevant view/usecase: updates when working in GH or Dynamo
# Example relevant view/usecase: updating the list of streams for a user.
#
# Source:
# - stream created mutation (target: stream creator)
# - stream grant permissions (target: grantee id)
#
# Payload: streamId and sreamCreate input args
# 
# As per discussion, removed `ownerId` input so this now just works for the current user
"""
Subscribes to new stream created event for a given user.
Subscribes to new stream created event for your profile. Use this to display an up-to-date list of streams for your profile.
"""
userStreamCreated: JSONObject
@hasRole(role: "server:user")
@hasScope(scope: "profile:read")
# Source:
# - stream delete mutation (target: all stream users in acl)
# - stream revoke permissions (target: grantee id)
#
# Payload: streamId
"""
Subscribes to stream deleted event for a given user.
Subscribes to stream deleted event for your profile. Use this to display an up-to-date list of streams for your profile.
"""
userStreamDeleted: JSONObject
@hasRole(role: "server:user")
@hasScope(scope: "profile:read")
#
# Stream bound subscriptions that operate on the stream itself.
# Example relevant view/usecase: a single stream connector, or view, or component in a web app
#
"""
Subscribes to stream updated event.
Subscribes to stream updated event. Use this in clients/components that pertain only to this stream.
"""
streamUpdated( streamId: String ): JSONObject
@hasRole(role: "server:user")
@hasScope(scope: "streams:read")
"""
Subscribes to stream permission granted event.
Subscribes to stream deleted event. Use this in clients/components that pertain only to this stream.
"""
streamDeleted( streamId: String ): JSONObject
@hasRole(role: "server:user")
@hasScope(scope: "streams:read")
"""
Subscribes to stream permission granted event. Use this to display an up-to-date list of streams for your profile.
"""
streamPermissionGranted( userId: String! ): JSONObject
@hasRole(role: "server:user")
@hasScope(scope: "profile:read")
"""
Subscribes to stream delete event.
Subscribes to stream delete event. Use this to display an up-to-date list of streams for your profile.
"""
streamPermissionRevoked( userId: String! ): JSONObject
@hasRole(role: "server:user")
+26 -26
View File
@@ -4,8 +4,8 @@ const crs = require( 'crypto-random-string' )
const appRoot = require( 'app-root-path' )
const knex = require( `${appRoot}/db/knex` )
const Users = () => knex( 'users' )
const Acl = () => knex( 'server_acl' )
const Users = ( ) => knex( 'users' )
const Acl = ( ) => knex( 'server_acl' )
module.exports = {
@@ -16,61 +16,61 @@ module.exports = {
*/
async createUser( user ) {
let [ {count} ] = await Acl().where( {role: 'server:admin'} ).count()
let [ { count } ] = await Acl( ).where( { role: 'server:admin' } ).count( )
user.id = crs( {length: 10} )
user.id = crs( { length: 10 } )
if ( user.password ) {
user.passwordDigest = await bcrypt.hash( user.password, 10 )
}
delete user.password
let usr = await Users().select( 'id' ).where( {email: user.email} ).first()
let usr = await Users( ).select( 'id' ).where( { email: user.email } ).first( )
if ( usr ) throw new Error( 'Email taken. Try logging in?' )
let res = await Users().returning( 'id' ).insert( user )
let res = await Users( ).returning( 'id' ).insert( user )
if ( parseInt( count ) === 0 ) {
await Acl().insert( {userId: res[0], role: 'server:admin'} )
await Acl( ).insert( { userId: res[ 0 ], role: 'server:admin' } )
} else {
await Acl().insert( {userId: res[0], role: 'server:user'} )
await Acl( ).insert( { userId: res[ 0 ], role: 'server:user' } )
}
return res[0]
return res[ 0 ]
},
async findOrCreateUser( {user, rawProfile} ) {
let existingUser = await Users().select( 'id' ).where( {email: user.email} ).first()
async findOrCreateUser( { user, rawProfile } ) {
let existingUser = await Users( ).select( 'id' ).where( { email: user.email } ).first( )
if ( existingUser )
return existingUser
user.password = crs( {length: 20} )
user.password = crs( { length: 20 } )
user.verified = true // because we trust the external identity provider, no?
return {id: await module.exports.createUser( user )}
return { id: await module.exports.createUser( user ) }
},
async getUserById( {userId} ) {
let user = await Users().where( {id: userId} ).select( '*' ).first()
async getUserById( { userId } ) {
let user = await Users( ).where( { id: userId } ).select( '*' ).first( )
delete user.passwordDigest
return user
},
// TODO: deprecate
async getUser( id ) {
let user = await Users().where( {id: id} ).select( '*' ).first()
let user = await Users( ).where( { id: id } ).select( '*' ).first( )
delete user.passwordDigest
return user
},
async getUserByEmail( {email} ) {
let user = await Users().where( {email: email} ).select( '*' ).first()
async getUserByEmail( { email } ) {
let user = await Users( ).where( { email: email } ).select( '*' ).first( )
delete user.passwordDigest
return user
},
async getUserRole( id ) {
let {role} = await Acl().where( {userId: id} ).select( 'role' ).first()
let { role } = await Acl( ).where( { userId: id } ).select( 'role' ).first( )
return role
},
@@ -79,16 +79,16 @@ module.exports = {
delete user.passwordDigest
delete user.password
delete user.email
await Users().where( {id: id} ).update( user )
await Users( ).where( { id: id } ).update( user )
},
async searchUsers( searchQuery, limit, cursor ) {
limit = limit || 25
let query = Users()
let query = Users( )
.select( 'id', 'username', 'name', 'bio', 'company', 'verified', 'avatar', 'createdAt' )
.where( queryBuilder => {
queryBuilder.where( {email: searchQuery} ) //match full email or partial username / name
queryBuilder.where( { email: searchQuery } ) //match full email or partial username / name
queryBuilder.orWhere( 'username', 'ILIKE', `%${searchQuery}%` )
queryBuilder.orWhere( 'name', 'ILIKE', `%${searchQuery}%` )
} )
@@ -99,15 +99,15 @@ module.exports = {
query.orderBy( 'users.createdAt', 'desc' ).limit( limit )
let rows = await query
return {users: rows, cursor: rows.length > 0 ? rows[rows.length - 1].createdAt.toISOString() : null}
return { users: rows, cursor: rows.length > 0 ? rows[ rows.length - 1 ].createdAt.toISOString( ) : null }
},
async validatePasssword( {email, password} ) {
let {passwordDigest} = await Users().where( {email: email} ).select( 'passwordDigest' ).first()
async validatePasssword( { email, password } ) {
let { passwordDigest } = await Users( ).where( { email: email } ).select( 'passwordDigest' ).first( )
return bcrypt.compare( password, passwordDigest )
},
async deleteUser( id ) {
throw new Error( 'not implemented' )
}
}
}
+110 -69
View File
@@ -32,10 +32,10 @@ describe( 'GraphQL API Subscriptions', ( ) => {
const getWsClient = ( wsurl, authToken ) => {
const client = new SubscriptionClient( wsAddr, {
reconnect: true,
connectionParams: { headers: { Authorization: authToken } }
},
ws )
reconnect: true,
connectionParams: { headers: { Authorization: authToken } }
},
ws )
return client
}
@@ -59,7 +59,7 @@ describe( 'GraphQL API Subscriptions', ( ) => {
// console.error( `stderr: ${data}` )
// } )
await sleep( 5000 )
await sleep( 3000 )
userA.id = await createUser( userA )
let token = await createPersonalAccessToken( userA.id, 'test token user A', [ 'streams:read', 'streams:write', 'users:read', 'users:email', 'tokens:write', 'tokens:read', 'profile:read', 'profile:email' ] )
@@ -77,7 +77,7 @@ describe( 'GraphQL API Subscriptions', ( ) => {
} )
describe( 'Streams', ( ) => {
it( 'Should be notified when a stream is created', async ( ) => {
it( 'A user (me) should be notified when a stream is created', async ( ) => {
let eventNum = 0
const query = gql `subscription mySub { userStreamCreated }`
const client = createSubscriptionObservable( wsAddr, userA.token, query )
@@ -102,6 +102,36 @@ describe( 'GraphQL API Subscriptions', ( ) => {
consumer.unsubscribe( )
} ).timeout( 5000 )
it( 'A user (me) should be notified when a stream is deleted', async ( ) => {
const sc1 = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const sc2 = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const sid1 = sc1.body.data.streamCreate
const sid2 = sc2.body.data.streamCreate
let eventNum = 0
const query = gql `subscription userStreamDeleted { userStreamDeleted }`
const client = createSubscriptionObservable( wsAddr, userA.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.userStreamDeleted ).to.exist
eventNum++
} )
await sleep( 500 )
let sd1 = await sendRequest( userA.token, { query: `mutation { streamDelete(id: "${sid1}" ) }` } )
.expect( 200 )
.expect( noErrors )
let sd2 = await sendRequest( userA.token, { query: `mutation { streamDelete(id: "${sid2}" ) }` } )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} ).timeout( 5000 )
it( 'Should be notified when a stream is updated', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
@@ -132,44 +162,39 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 3 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when a stream is deleted', async ( ) => {
const sc1 = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const sc2 = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const sid1 = sc1.body.data.streamCreate
const sid2 = sc2.body.data.streamCreate
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
let eventNum = 0
const query = gql `subscription userStreamDeleted { userStreamDeleted }`
const query = gql `subscription streamDeleted { streamDeleted( streamId: "${streamId}" ) }`
const client = createSubscriptionObservable( wsAddr, userA.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.userStreamDeleted ).to.exist
expect( eventData.data.streamDeleted ).to.exist
eventNum++
} )
await sleep( 500 )
let sd1 = await sendRequest( userA.token, { query: `mutation { streamDelete(id: "${sid1}" ) }` } )
.expect( 200 )
.expect( noErrors )
let sd2 = await sendRequest( userA.token, { query: `mutation { streamDelete(id: "${sid2}" ) }` } )
const resSU = await sendRequest( userA.token, { query: `mutation { streamDelete( id: "${streamId}" ) }` } )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
expect( eventNum ).to.equal( 1 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when stream permission is granted', async () => {
it( 'Should be notified when stream permission is granted', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
let eventNum = 0
const query = gql`subscription permissionGranted { streamPermissionGranted( userId: "${userB.id}" ) }`
const query = gql `subscription permissionGranted { streamPermissionGranted( userId: "${userB.id}" ) }`
const client = createSubscriptionObservable( wsAddr, userB.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.streamPermissionGranted ).to.exist
@@ -180,21 +205,22 @@ describe( 'GraphQL API Subscriptions', ( ) => {
let sg =
await sendRequest( userA.token, {
query: `mutation { streamGrantPermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}", role: "stream:contributor"} ) }` } )
.expect( 200 )
.expect( noErrors )
query: `mutation { streamGrantPermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}", role: "stream:contributor"} ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 1 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when stream permission is revoked', async () => {
it( 'Should be notified when stream permission is revoked', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
let eventNum = 0
const query = gql`subscription permissionRevoked { streamPermissionRevoked( userId: "${userB.id}" ) }`
const query = gql `subscription permissionRevoked { streamPermissionRevoked( userId: "${userB.id}" ) }`
const client = createSubscriptionObservable( wsAddr, userB.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.streamPermissionRevoked ).to.exist
@@ -204,22 +230,24 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let sg = await sendRequest( userA.token, {
query: `mutation { streamGrantPermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}", role: "stream:contributor"} ) }` } )
query: `mutation { streamGrantPermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}", role: "stream:contributor"} ) }`
} )
.expect( 200 )
.expect( noErrors )
let sr = await sendRequest( userA.token, {
query: `mutation { streamRevokePermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}"} ) }` } )
query: `mutation { streamRevokePermission( permissionParams: {streamId: "${streamId}", userId: "${userB.id}"} ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 1 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should *not* be notified of stream creation if invalid token', async () => {
it( 'Should *not* be notified of stream creation if invalid token', async ( ) => {
let eventNum = 0
const query = gql`subscription mySub { userStreamCreated }`
const query = gql `subscription mySub { userStreamCreated }`
const client = createSubscriptionObservable( wsAddr, "faketoken123", query )
const consumer = client.subscribe( eventData => {
expect( eventData.data ).to.not.exist
@@ -235,10 +263,10 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 0 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should *not* be notified of another user stream created', async () => {
const query = gql`subscription mySub { userStreamCreated }`
it( 'Should *not* be notified of another user stream created', async ( ) => {
const query = gql `subscription mySub { userStreamCreated }`
const client = createSubscriptionObservable( wsAddr, userB.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.userStreamCreated ).to.not.exist
@@ -255,12 +283,12 @@ describe( 'GraphQL API Subscriptions', ( ) => {
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
consumer.unsubscribe()
consumer.unsubscribe( )
} )
it( 'Should *not* allow subscribing to stream creation without profile:read scope', async () => {
it( 'Should *not* allow subscribing to stream creation without profile:read scope', async ( ) => {
let eventNum = 0
const query = gql`subscription mySub { userStreamCreated }`
const query = gql `subscription mySub { userStreamCreated }`
const client = createSubscriptionObservable( wsAddr, userC.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.userStreamCreated ).to.not.exist
@@ -282,8 +310,8 @@ describe( 'GraphQL API Subscriptions', ( ) => {
// directive which wraps the entire resolver. it seems that in this case the resolver fully executes and does ping
// the subscriber and increment the eventNum, but ofc does not return a payload if you don't satisfy the directive
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe()
} )
consumer.unsubscribe( )
} ).timeout( 5000 )
} )
describe( 'Branches', ( ) => {
@@ -302,18 +330,20 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let bc1 = await sendRequest( userA.token, {
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "new branch 🌿", description: "this is a test branch 🌳" } ) }` } )
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "new branch 🌿", description: "this is a test branch 🌳" } ) }`
} )
.expect( 200 )
.expect( noErrors )
let bc2 = await sendRequest( userA.token, {
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "another branch 🥬", description: "this is a test branch 🌳" } ) }` } )
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "another branch 🥬", description: "this is a test branch 🌳" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when a branch is updated', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
@@ -334,18 +364,20 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let bu1 = await sendRequest( userA.token, {
query: `mutation { branchUpdate ( branch: { streamId: "${streamId}", id: "${branchId}", description: "updating this branch" } ) }` } )
query: `mutation { branchUpdate ( branch: { streamId: "${streamId}", id: "${branchId}", description: "updating this branch" } ) }`
} )
.expect( 200 )
.expect( noErrors )
let bu2 = await sendRequest( userA.token, {
query: `mutation { branchUpdate ( branch: { streamId: "${streamId}", id: "${branchId}", description: "updating this branch v2" } ) }` } )
query: `mutation { branchUpdate ( branch: { streamId: "${streamId}", id: "${branchId}", description: "updating this branch v2" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when a branch is deleted', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
@@ -370,25 +402,27 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let bd1 = await sendRequest( userA.token, {
query: `mutation { branchDelete ( branch: { streamId: "${streamId}", id: "${bid1}" } ) }` } )
query: `mutation { branchDelete ( branch: { streamId: "${streamId}", id: "${bid1}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
let bd2 = await sendRequest( userA.token, {
query: `mutation { branchDelete ( branch: { streamId: "${streamId}", id: "${bid2}" } ) }` } )
query: `mutation { branchDelete ( branch: { streamId: "${streamId}", id: "${bid2}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( `Should *not* be notified when a branch is created for a stream you're not authorised for`, async () => {
it( `Should *not* be notified when a branch is created for a stream you're not authorised for`, async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
let eventNum = 0
const query = gql`subscription { branchCreated( streamId: "${streamId}" ) }`
const query = gql `subscription { branchCreated( streamId: "${streamId}" ) }`
const client = createSubscriptionObservable( wsAddr, userB.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.branchCreated ).to.not.exist
@@ -398,14 +432,15 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let bc = await sendRequest( userA.token, {
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "new branch 🌿", description: "this is a test branch 🌳" } ) }` } )
query: `mutation { branchCreate ( branch: { streamId: "${streamId}", name: "new branch 🌿", description: "this is a test branch 🌳" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 0 )
consumer.unsubscribe()
} )
consumer.unsubscribe( )
} ).timeout( 5000 )
} )
describe( 'Commits', ( ) => {
@@ -428,18 +463,20 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let cc1 = await sendRequest( userA.token, {
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId1}" } ) }` } )
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId1}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
let cc2 = await sendRequest( userA.token, {
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId2}" } ) }` } )
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId2}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when a commit is updated', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
@@ -460,18 +497,20 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let cu1 = await sendRequest( userA.token, {
query: `mutation { commitUpdate ( commit: { streamId: "${streamId}", id: "${commitId}", message: "updating this commit" } ) }` } )
query: `mutation { commitUpdate ( commit: { streamId: "${streamId}", id: "${commitId}", message: "updating this commit" } ) }`
} )
.expect( 200 )
.expect( noErrors )
let cu2 = await sendRequest( userA.token, {
query: `mutation { commitUpdate ( commit: { streamId: "${streamId}", id: "${commitId}", message: "updating this commit v2" } ) }` } )
query: `mutation { commitUpdate ( commit: { streamId: "${streamId}", id: "${commitId}", message: "updating this commit v2" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 2 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( 'Should be notified when a commit is deleted', async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
@@ -492,23 +531,24 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let cd = await sendRequest( userA.token, {
query: `mutation { commitDelete ( commit: { streamId: "${streamId}", id: "${commitId}" } ) }` } )
query: `mutation { commitDelete ( commit: { streamId: "${streamId}", id: "${commitId}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 1 )
consumer.unsubscribe( )
} )
} ).timeout( 5000 )
it( `Should *not* be notified when a commit is created on a stream you're not authorised for`, async () => {
it( `Should *not* be notified when a commit is created on a stream you're not authorised for`, async ( ) => {
const resSC = await sendRequest( userA.token, { query: `mutation { streamCreate(stream: { name: "Subs Test (u A) Private", description: "Hello World", isPublic:false } ) }` } )
const streamId = resSC.body.data.streamCreate
const resOC = await sendRequest( userA.token, { query: `mutation { objectCreate( objectInput: {streamId: "${streamId}", objects: {hello: "goodbye 🌊"}} ) }` } )
const objId = resOC.body.data.objectCreate
let eventNum = 0
const query = gql`subscription { commitCreated( streamId: "${streamId}" ) }`
const query = gql `subscription { commitCreated( streamId: "${streamId}" ) }`
const client = createSubscriptionObservable( wsAddr, userB.token, query )
const consumer = client.subscribe( eventData => {
expect( eventData.data.commitCreated ).to.not.exist
@@ -518,14 +558,15 @@ describe( 'GraphQL API Subscriptions', ( ) => {
await sleep( 500 )
let cc = await sendRequest( userA.token, {
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId}" } ) }` } )
query: `mutation { commitCreate ( commit: { streamId: "${streamId}", branchName: "master", objectId: "${objId}" } ) }`
} )
.expect( 200 )
.expect( noErrors )
await sleep( 1000 ) // we need to wait up a second here
expect( eventNum ).to.equal( 0 )
consumer.unsubscribe()
} )
consumer.unsubscribe( )
} ).timeout( 5000 )
} )
} )
+2 -2
View File
@@ -129,14 +129,14 @@ async function authorizeResolver( userId, resourceId, requiredRole ) {
let userAclEntry = await knex( role.aclTableName ).select( '*' ).where( { resourceId: resourceId, userId: userId } ).first( )
if ( !userAclEntry ) throw new ForbiddenError( 'You are not authorized' )
if ( !userAclEntry ) throw new ForbiddenError( 'You do not have access to this resource.' )
userAclEntry.role = roles.find( r => r.name === userAclEntry.role )
if ( userAclEntry.role.weight >= role.weight )
return userAclEntry.role.name
else
throw new ForbiddenError( 'You are not authorized' )
throw new ForbiddenError( 'You are not authorized.' )
}
module.exports = {