chore(server): refactor activityStream invocations - batch #2 - accessRequest

This commit is contained in:
Kristaps Fabians Geikins
2025-01-22 15:41:45 +02:00
parent c49887578c
commit 90e94d9cd7
4 changed files with 137 additions and 129 deletions
@@ -0,0 +1,128 @@
import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus'
import {
AddStreamAccessRequestDeclinedActivity,
AddStreamAccessRequestedActivity,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import {
AccessRequestType,
isStreamAccessRequest
} from '@/modules/accessrequests/repositories'
/**
* Save a "stream access requested" activity
*/
const addStreamAccessRequestedActivityFactory =
({
saveActivity
}: {
saveActivity: SaveActivity
}): AddStreamAccessRequestedActivity =>
async (params: { streamId: string; requesterId: string }) => {
const { streamId, requesterId } = params
await saveActivity({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
userId: requesterId,
actionType: ActionTypes.Stream.AccessRequestSent,
message: `User ${requesterId} has requested access to stream ${streamId}`,
info: { requesterId }
})
}
/**
* Save a "stream acccess request declined/denied" activity
*/
const addStreamAccessRequestDeclinedActivityFactory =
({
saveActivity
}: {
saveActivity: SaveActivity
}): AddStreamAccessRequestDeclinedActivity =>
async (params: { streamId: string; requesterId: string; declinerId: string }) => {
const { streamId, requesterId, declinerId } = params
await saveActivity({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
userId: declinerId,
actionType: ActionTypes.Stream.AccessRequestDeclined,
message: `User ${declinerId} declined access to stream ${streamId} for user ${requesterId}`,
info: { requesterId, declinerId }
})
}
const onServerAccessRequestCreatedFactory =
({
addStreamAccessRequestedActivity
}: {
addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity
}) =>
async (payload: EventPayload<typeof AccessRequestEvents.Created>) => {
const {
request: { resourceId, requesterId }
} = payload.payload
if (!isStreamAccessRequest(payload.payload.request)) return
if (!resourceId) return
await addStreamAccessRequestedActivity({
streamId: resourceId,
requesterId
})
}
const onServerAccessRequestFinalizedFactory =
({
addStreamAccessRequestDeclinedActivity
}: {
addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity
}) =>
async (payload: EventPayload<typeof AccessRequestEvents.Finalized>) => {
const {
approved,
finalizedBy,
request: { resourceId, resourceType, requesterId }
} = payload.payload
if (!resourceId) return
if (resourceType === AccessRequestType.Stream) {
// If user was added to stream, an activity stream item was already added from 'addOrUpdateStreamCollaborator'
if (approved) return
await addStreamAccessRequestDeclinedActivity({
streamId: resourceId,
requesterId,
declinerId: finalizedBy
})
}
}
export const reportAccessRequestActivityFactory =
(deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => {
const addStreamAccessRequestedActivity =
addStreamAccessRequestedActivityFactory(deps)
const addStreamAccessRequestDeclinedActivity =
addStreamAccessRequestDeclinedActivityFactory(deps)
const onServerAccessRequestCreated = onServerAccessRequestCreatedFactory({
addStreamAccessRequestedActivity
})
const onServerAccessRequestFinalized = onServerAccessRequestFinalizedFactory({
addStreamAccessRequestDeclinedActivity
})
const quitters = [
deps.eventListen(AccessRequestEvents.Created, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
return await onServerAccessRequestCreated(payload)
}),
deps.eventListen(AccessRequestEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
await onServerAccessRequestFinalized(payload)
})
]
return () => quitters.forEach((quit) => quit())
}
@@ -14,10 +14,6 @@ import {
addStreamInviteSentOutActivityFactory
} from '@/modules/activitystream/services/streamActivity'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import {
addStreamAccessRequestDeclinedActivityFactory,
addStreamAccessRequestedActivityFactory
} from '@/modules/activitystream/services/accessRequestActivity'
import { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler'
import {
@@ -25,18 +21,13 @@ import {
releaseTaskLockFactory
} from '@/modules/core/repositories/scheduledTasks'
import { Knex } from 'knex'
import {
onServerAccessRequestCreatedFactory,
onServerAccessRequestFinalizedFactory,
onServerInviteCreatedFactory
} from '@/modules/activitystream/services/eventListener'
import { onServerInviteCreatedFactory } from '@/modules/activitystream/services/eventListener'
import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core'
import { publish } from '@/modules/shared/utils/subscriptions'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import { reportUserActivityFactory } from '@/modules/activitystream/events/userListeners'
import { reportAccessRequestActivityFactory } from '@/modules/activitystream/events/accessRequestListeners'
let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<() => void> = undefined
@@ -57,25 +48,14 @@ const initializeEventListeners = ({
eventListen: eventBus.listen,
saveActivity
})
const reportAccessRequestActivity = reportAccessRequestActivityFactory({
eventListen: eventBus.listen,
saveActivity
})
const quitCbs = [
reportUserActivity(),
eventBus.listen(AccessRequestEvents.Created, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
return await onServerAccessRequestCreatedFactory({
addStreamAccessRequestedActivity: addStreamAccessRequestedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
})(payload)
}),
eventBus.listen(AccessRequestEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
await onServerAccessRequestFinalizedFactory({
addStreamAccessRequestDeclinedActivity:
addStreamAccessRequestDeclinedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
})(payload)
}),
reportAccessRequestActivity(),
eventBus.listen(ServerInvitesEvents.Created, async ({ payload }) => {
if (!isProjectResourceTarget(payload.invite.resource)) return
await onServerInviteCreatedFactory({
@@ -1,45 +0,0 @@
import {
AddStreamAccessRequestedActivity,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
/**
* Save a "stream access requested" activity
*/
export const addStreamAccessRequestedActivityFactory =
({
saveActivity
}: {
saveActivity: SaveActivity
}): AddStreamAccessRequestedActivity =>
async (params: { streamId: string; requesterId: string }) => {
const { streamId, requesterId } = params
await saveActivity({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
userId: requesterId,
actionType: ActionTypes.Stream.AccessRequestSent,
message: `User ${requesterId} has requested access to stream ${streamId}`,
info: { requesterId }
})
}
/**
* Save a "stream acccess request declined/denied" activity
*/
export const addStreamAccessRequestDeclinedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }) =>
async (params: { streamId: string; requesterId: string; declinerId: string }) => {
const { streamId, requesterId, declinerId } = params
await saveActivity({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
userId: declinerId,
actionType: ActionTypes.Stream.AccessRequestDeclined,
message: `User ${declinerId} declined access to stream ${streamId} for user ${requesterId}`,
info: { requesterId, declinerId }
})
}
@@ -1,14 +1,5 @@
import { Logger } from '@/logging/logging'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import {
AccessRequestType,
isStreamAccessRequest
} from '@/modules/accessrequests/repositories'
import {
AddStreamAccessRequestDeclinedActivity,
AddStreamAccessRequestedActivity,
AddStreamInviteSentOutActivity
} from '@/modules/activitystream/domain/operations'
import { AddStreamInviteSentOutActivity } from '@/modules/activitystream/domain/operations'
import { GetStream } from '@/modules/core/domain/streams/operations'
import {
ServerInvitesEvents,
@@ -18,52 +9,6 @@ import {
isProjectResourceTarget,
resolveTarget
} from '@/modules/serverinvites/helpers/core'
import { EventPayload } from '@/modules/shared/services/eventBus'
export const onServerAccessRequestCreatedFactory =
({
addStreamAccessRequestedActivity
}: {
addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity
}) =>
async (payload: EventPayload<typeof AccessRequestEvents.Created>) => {
const {
request: { resourceId, requesterId }
} = payload.payload
if (!isStreamAccessRequest(payload.payload.request)) return
if (!resourceId) return
await addStreamAccessRequestedActivity({
streamId: resourceId,
requesterId
})
}
export const onServerAccessRequestFinalizedFactory =
({
addStreamAccessRequestDeclinedActivity
}: {
addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity
}) =>
async (payload: EventPayload<typeof AccessRequestEvents.Finalized>) => {
const {
approved,
finalizedBy,
request: { resourceId, resourceType, requesterId }
} = payload.payload
if (!resourceId) return
if (resourceType === AccessRequestType.Stream) {
// If user was added to stream, an activity stream item was already added from 'addOrUpdateStreamCollaborator'
if (approved) return
await addStreamAccessRequestDeclinedActivity({
streamId: resourceId,
requesterId,
declinerId: finalizedBy
})
}
}
export const onServerInviteCreatedFactory =
({