Merge pull request #3769 from specklesystems/fabians/web-2414-3

chore(server): event bus refactor - access req emitter - batch #3
This commit is contained in:
Kristaps Fabians Geikins
2025-01-13 14:00:48 +02:00
committed by GitHub
11 changed files with 104 additions and 83 deletions
@@ -0,0 +1,26 @@
import { ServerAccessRequestRecord } from '@/modules/accessrequests/repositories'
import { StreamRoles } from '@speckle/shared'
export const accessRequestEventsNamespace = 'accessrequests' as const
export const AccessRequestEvents = {
Created: `${accessRequestEventsNamespace}.created`,
Finalized: `${accessRequestEventsNamespace}.finalized`
} as const
export type AccessRequestEventsPayloads = {
[AccessRequestEvents.Created]: { request: ServerAccessRequestRecord }
[AccessRequestEvents.Finalized]: {
request: ServerAccessRequestRecord
/**
* ID of the user that finalized this request
*/
finalizedBy: string
/**
* If this object is set, request was approved
*/
approved?: {
role: StreamRoles
}
}
}
@@ -1,31 +0,0 @@
import { ServerAccessRequestRecord } from '@/modules/accessrequests/repositories'
import { StreamRoles } from '@/modules/core/helpers/mainConstants'
import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup'
export enum AccessRequestsEvents {
Created = 'created',
Finalized = 'finalized'
}
export type AccessRequestsEventsPayloads = {
[AccessRequestsEvents.Created]: { request: ServerAccessRequestRecord }
[AccessRequestsEvents.Finalized]: {
request: ServerAccessRequestRecord
/**
* ID of the user that finalized this request
*/
finalizedBy: string
/**
* If this object is set, request was approved
*/
approved?: {
role: StreamRoles
}
}
}
const { emit, listen } = initializeModuleEventEmitter<AccessRequestsEventsPayloads>({
moduleName: 'accessrequests'
})
export const AccessRequestsEmitter = { emit, listen, events: AccessRequestsEvents }
@@ -1,5 +1,4 @@
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import {
AccessRequestType,
createNewRequestFactory,
@@ -36,6 +35,7 @@ import {
} from '@/modules/core/services/streams/access'
import { authorizeResolver } from '@/modules/shared'
import { LogicError } from '@/modules/shared/errors'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
const getUser = getUserFactory({ db })
@@ -52,7 +52,7 @@ const requestProjectAccess = requestProjectAccessFactory({
getUserStreamAccessRequest,
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
const requestStreamAccess = requestStreamAccessFactory({
@@ -90,7 +90,7 @@ const processPendingStreamRequest = processPendingStreamRequestFactory({
validateStreamAccess,
addOrUpdateStreamCollaborator,
deleteRequestById: deleteRequestByIdFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
const processPendingProjectRequest = processPendingStreamRequest
@@ -1,10 +1,10 @@
import { db } from '@/db/knex'
import { moduleLogger } from '@/logging/logging'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { initializeEventListenerFactory } from '@/modules/accessrequests/services/eventListener'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { publishNotification } from '@/modules/notifications/services/publication'
import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'
let quitListeners: Optional<() => void> = undefined
@@ -16,7 +16,7 @@ const ServerAccessRequestsModule: SpeckleModule = {
const initializeEventListener = initializeEventListenerFactory({
getStreamCollaborators: getStreamCollaboratorsFactory({ db }),
publishNotification,
accessRequestsEventListener: AccessRequestsEmitter.listen
eventBus: getEventBus()
})
quitListeners = initializeEventListener()
}
@@ -1,8 +1,4 @@
import {
AccessRequestsEmitter,
AccessRequestsEvents,
AccessRequestsEventsPayloads
} from '@/modules/accessrequests/events/emitter'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { GetStreamCollaborators } from '@/modules/core/domain/streams/operations'
import { Roles } from '@/modules/core/helpers/mainConstants'
@@ -10,6 +6,7 @@ import {
NotificationPublisher,
NotificationType
} from '@/modules/notifications/helpers/types'
import { EventBus, EventPayload } from '@/modules/shared/services/eventBus'
type OnServerAccessRequestCreatedDeps = {
getStreamCollaborators: GetStreamCollaborators
@@ -18,8 +15,8 @@ type OnServerAccessRequestCreatedDeps = {
const onServerAccessRequestCreatedFactory =
(deps: OnServerAccessRequestCreatedDeps) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Created]) => {
const { request } = payload
async (payload: EventPayload<typeof AccessRequestEvents.Created>) => {
const { request } = payload.payload
// Send out email to all owners of the stream
if (isStreamAccessRequest(request)) {
@@ -46,8 +43,8 @@ type OnServerAccessRequestFinalizedDeps = {
const onServerAccessRequestFinalizedFactory =
(deps: OnServerAccessRequestFinalizedDeps) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Finalized]) => {
const { approved, request, finalizedBy } = payload
async (payload: EventPayload<typeof AccessRequestEvents.Finalized>) => {
const { approved, request, finalizedBy } = payload.payload
// Send out email to requester, if accepted
if (approved && isStreamAccessRequest(request)) {
@@ -67,7 +64,7 @@ const onServerAccessRequestFinalizedFactory =
export const initializeEventListenerFactory =
(
deps: {
accessRequestsEventListener: (typeof AccessRequestsEmitter)['listen']
eventBus: EventBus
} & OnServerAccessRequestCreatedDeps &
OnServerAccessRequestFinalizedDeps
) =>
@@ -76,12 +73,9 @@ export const initializeEventListenerFactory =
const onServerAccessRequestFinalized = onServerAccessRequestFinalizedFactory(deps)
const quitCbs = [
deps.accessRequestsEventListener(
AccessRequestsEvents.Created,
onServerAccessRequestCreated
),
deps.accessRequestsEventListener(
AccessRequestsEvents.Finalized,
deps.eventBus.listen(AccessRequestEvents.Created, onServerAccessRequestCreated),
deps.eventBus.listen(
AccessRequestEvents.Finalized,
onServerAccessRequestFinalized
)
]
@@ -2,7 +2,6 @@ import {
AccessRequestCreationError,
AccessRequestProcessingError
} from '@/modules/accessrequests/errors'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { StreamAccessRequestGraphQLReturn } from '@/modules/accessrequests/helpers/graphTypes'
import {
AccessRequestType,
@@ -35,6 +34,8 @@ import {
GetStream,
ValidateStreamAccess
} from '@/modules/core/domain/streams/operations'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
function buildStreamAccessRequestGraphQLReturn(
record: ServerAccessRequestRecord<AccessRequestType.Stream, string>
@@ -85,7 +86,7 @@ export const requestProjectAccessFactory =
getUserStreamAccessRequest: GetUserStreamAccessRequest
getStream: GetStream
createNewRequest: CreateNewRequest
accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit']
emitEvent: EventBusEmit
}): RequestProjectAccess =>
async (userId: string, projectId: string) => {
const [stream, existingRequest] = await Promise.all([
@@ -121,8 +122,11 @@ export const requestProjectAccessFactory =
resourceId: projectId
})
await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Created, {
request: req
await deps.emitEvent({
eventName: AccessRequestEvents.Created,
payload: {
request: req
}
})
return req
@@ -168,7 +172,7 @@ export const processPendingStreamRequestFactory =
validateStreamAccess: ValidateStreamAccess
addOrUpdateStreamCollaborator: AddOrUpdateStreamCollaborator
deleteRequestById: DeleteRequestById
accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit']
emitEvent: EventBusEmit
}) =>
async (
userId: string,
@@ -216,10 +220,13 @@ export const processPendingStreamRequestFactory =
await deps.deleteRequestById(req.id)
await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Finalized, {
request: req,
approved: accept ? { role } : undefined,
finalizedBy: userId
await deps.emitEvent({
eventName: AccessRequestEvents.Finalized,
payload: {
request: req,
approved: accept ? { role } : undefined,
finalizedBy: userId
}
})
return req
@@ -1,5 +1,5 @@
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import {
createNewRequestFactory,
deleteRequestByIdFactory,
@@ -42,6 +42,7 @@ import {
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
import { BasicTestUser, createTestUsers } from '@/test/authHelper'
import {
@@ -75,7 +76,7 @@ const requestProjectAccess = requestProjectAccessFactory({
}),
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
const saveActivity = saveActivityFactory({ db })
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
@@ -162,6 +163,8 @@ describe('Project access requests', () => {
id: ''
}
let quitters: (() => void)[] = []
before(async () => {
await cleanup()
await createTestUsers([me, otherGuy, anotherGuy])
@@ -176,6 +179,11 @@ describe('Project access requests', () => {
notificationsStateManager = buildNotificationsStateTracker()
})
afterEach(() => {
quitters.forEach((q) => q())
quitters = []
})
after(async () => {
notificationsStateManager.destroy()
})
@@ -226,6 +234,13 @@ describe('Project access requests', () => {
})
it('operation succeeds', async () => {
let eventFired = false
quitters.push(
getEventBus().listen(AccessRequestEvents.Created, async (payload) => {
expect(payload.payload.request.requesterId).to.eq(me.id)
eventFired = true
})
)
const sendEmailCall = EmailSendingServiceMock.hijackFunction(
'sendEmail',
async () => true
@@ -267,6 +282,7 @@ describe('Project access requests', () => {
userId: me.id
})
expect(streamActivity).to.have.lengthOf(1)
expect(eventFired).to.be.true
})
it('operation fails if request already exists', async () => {
@@ -447,6 +463,15 @@ describe('Project access requests', () => {
]
validProcessingDataSet.forEach(({ display, accept, role }) => {
it(`${display} works`, async () => {
let eventFired = false
quitters.push(
getEventBus().listen(AccessRequestEvents.Finalized, async (payload) => {
expect(!!payload.payload.approved).to.eq(accept)
expect(payload.payload.finalizedBy).to.eq(me.id)
eventFired = true
})
)
const results = await useReq(validReqId, accept, role)
expect(results).to.not.haveGraphQLErrors()
expect(results.data?.projectMutations.accessRequestMutations.use).to.be.ok
@@ -477,6 +502,7 @@ describe('Project access requests', () => {
})
expect(streamActivity).to.have.lengthOf(1)
}
expect(eventFired).to.be.true
})
})
})
@@ -1,6 +1,5 @@
import { buildApolloServer } from '@/app'
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import {
createNewRequestFactory,
deleteRequestByIdFactory,
@@ -44,6 +43,7 @@ import {
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
import { BasicTestUser, createTestUsers } from '@/test/authHelper'
import {
@@ -78,7 +78,7 @@ const requestStreamAccess = requestStreamAccessFactory({
}),
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
})
const saveActivity = saveActivityFactory({ db })
@@ -31,16 +31,13 @@ import {
onServerInviteCreatedFactory,
onUserCreatedFactory
} from '@/modules/activitystream/services/eventListener'
import {
AccessRequestsEmitter,
AccessRequestsEvents
} from '@/modules/accessrequests/events/emitter'
import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core'
import { publish } from '@/modules/shared/utils/subscriptions'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import { UserEvents } from '@/modules/core/domain/users/events'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<() => void> = undefined
@@ -62,16 +59,16 @@ const initializeEventListeners = ({
// this activity will always go in the main DB
onUserCreatedFactory({ saveActivity: saveActivityFactory({ db }) })
),
AccessRequestsEmitter.listen(AccessRequestsEvents.Created, async ({ request }) => {
if (!isStreamAccessRequest(request)) return
eventBus.listen(AccessRequestEvents.Created, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
return await onServerAccessRequestCreatedFactory({
addStreamAccessRequestedActivity: addStreamAccessRequestedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
})({ request })
})(payload)
}),
AccessRequestsEmitter.listen(AccessRequestsEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.request)) return
eventBus.listen(AccessRequestEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
await onServerAccessRequestFinalizedFactory({
addStreamAccessRequestDeclinedActivity:
addStreamAccessRequestDeclinedActivityFactory({
@@ -1,8 +1,5 @@
import { Logger } from '@/logging/logging'
import {
AccessRequestsEvents,
AccessRequestsEventsPayloads
} from '@/modules/accessrequests/events/emitter'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import {
AccessRequestType,
isStreamAccessRequest
@@ -47,11 +44,11 @@ export const onServerAccessRequestCreatedFactory =
}: {
addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity
}) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Created]) => {
async (payload: EventPayload<typeof AccessRequestEvents.Created>) => {
const {
request: { resourceId, requesterId }
} = payload
if (!isStreamAccessRequest(payload.request)) return
} = payload.payload
if (!isStreamAccessRequest(payload.payload.request)) return
if (!resourceId) return
await addStreamAccessRequestedActivity({
@@ -66,12 +63,12 @@ export const onServerAccessRequestFinalizedFactory =
}: {
addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity
}) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Finalized]) => {
async (payload: EventPayload<typeof AccessRequestEvents.Finalized>) => {
const {
approved,
finalizedBy,
request: { resourceId, resourceType, requesterId }
} = payload
} = payload.payload
if (!resourceId) return
if (resourceType === AccessRequestType.Stream) {
@@ -31,6 +31,10 @@ import {
versionEventsNamespace,
VersionEventsPayloads
} from '@/modules/core/domain/commits/events'
import {
accessRequestEventsNamespace,
AccessRequestEventsPayloads
} from '@/modules/accessrequests/domain/events'
type AllEventsWildcard = '**'
type EventWildcard = '*'
@@ -55,6 +59,7 @@ type EventsByNamespace = {
[projectEventsNamespace]: ProjectEventsPayloads
[userEventsNamespace]: UserEventsPayloads
[versionEventsNamespace]: VersionEventsPayloads
[accessRequestEventsNamespace]: AccessRequestEventsPayloads
}
type EventTypes = UnionToIntersection<EventsByNamespace[keyof EventsByNamespace]>