From 90e94d9cd7e18074bb47bc217bcfcf951707c5c1 Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Wed, 22 Jan 2025 15:41:45 +0200 Subject: [PATCH] chore(server): refactor activityStream invocations - batch #2 - accessRequest --- .../events/accessRequestListeners.ts | 128 ++++++++++++++++++ .../server/modules/activitystream/index.ts | 36 ++--- .../services/accessRequestActivity.ts | 45 ------ .../activitystream/services/eventListener.ts | 57 +------- 4 files changed, 137 insertions(+), 129 deletions(-) create mode 100644 packages/server/modules/activitystream/events/accessRequestListeners.ts delete mode 100644 packages/server/modules/activitystream/services/accessRequestActivity.ts diff --git a/packages/server/modules/activitystream/events/accessRequestListeners.ts b/packages/server/modules/activitystream/events/accessRequestListeners.ts new file mode 100644 index 000000000..7237afc3b --- /dev/null +++ b/packages/server/modules/activitystream/events/accessRequestListeners.ts @@ -0,0 +1,128 @@ +import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus' +import { + AddStreamAccessRequestDeclinedActivity, + AddStreamAccessRequestedActivity, + SaveActivity +} from '@/modules/activitystream/domain/operations' +import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' +import { + AccessRequestType, + isStreamAccessRequest +} from '@/modules/accessrequests/repositories' + +/** + * Save a "stream access requested" activity + */ +const addStreamAccessRequestedActivityFactory = + ({ + saveActivity + }: { + saveActivity: SaveActivity + }): AddStreamAccessRequestedActivity => + async (params: { streamId: string; requesterId: string }) => { + const { streamId, requesterId } = params + await saveActivity({ + streamId, + resourceType: ResourceTypes.Stream, + resourceId: streamId, + userId: requesterId, + actionType: ActionTypes.Stream.AccessRequestSent, + message: `User ${requesterId} has requested access to stream ${streamId}`, + info: { requesterId } + }) + } + +/** + * Save a "stream acccess request declined/denied" activity + */ +const addStreamAccessRequestDeclinedActivityFactory = + ({ + saveActivity + }: { + saveActivity: SaveActivity + }): AddStreamAccessRequestDeclinedActivity => + async (params: { streamId: string; requesterId: string; declinerId: string }) => { + const { streamId, requesterId, declinerId } = params + await saveActivity({ + streamId, + resourceType: ResourceTypes.Stream, + resourceId: streamId, + userId: declinerId, + actionType: ActionTypes.Stream.AccessRequestDeclined, + message: `User ${declinerId} declined access to stream ${streamId} for user ${requesterId}`, + info: { requesterId, declinerId } + }) + } + +const onServerAccessRequestCreatedFactory = + ({ + addStreamAccessRequestedActivity + }: { + addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity + }) => + async (payload: EventPayload) => { + const { + request: { resourceId, requesterId } + } = payload.payload + if (!isStreamAccessRequest(payload.payload.request)) return + if (!resourceId) return + + await addStreamAccessRequestedActivity({ + streamId: resourceId, + requesterId + }) + } + +const onServerAccessRequestFinalizedFactory = + ({ + addStreamAccessRequestDeclinedActivity + }: { + addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity + }) => + async (payload: EventPayload) => { + const { + approved, + finalizedBy, + request: { resourceId, resourceType, requesterId } + } = payload.payload + if (!resourceId) return + + if (resourceType === AccessRequestType.Stream) { + // If user was added to stream, an activity stream item was already added from 'addOrUpdateStreamCollaborator' + if (approved) return + + await addStreamAccessRequestDeclinedActivity({ + streamId: resourceId, + requesterId, + declinerId: finalizedBy + }) + } + } + +export const reportAccessRequestActivityFactory = + (deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => { + const addStreamAccessRequestedActivity = + addStreamAccessRequestedActivityFactory(deps) + const addStreamAccessRequestDeclinedActivity = + addStreamAccessRequestDeclinedActivityFactory(deps) + const onServerAccessRequestCreated = onServerAccessRequestCreatedFactory({ + addStreamAccessRequestedActivity + }) + const onServerAccessRequestFinalized = onServerAccessRequestFinalizedFactory({ + addStreamAccessRequestDeclinedActivity + }) + + const quitters = [ + deps.eventListen(AccessRequestEvents.Created, async (payload) => { + if (!isStreamAccessRequest(payload.payload.request)) return + return await onServerAccessRequestCreated(payload) + }), + deps.eventListen(AccessRequestEvents.Finalized, async (payload) => { + if (!isStreamAccessRequest(payload.payload.request)) return + await onServerAccessRequestFinalized(payload) + }) + ] + + return () => quitters.forEach((quit) => quit()) + } diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index 1aebbc331..820d99d42 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -14,10 +14,6 @@ import { addStreamInviteSentOutActivityFactory } from '@/modules/activitystream/services/streamActivity' import { getStreamFactory } from '@/modules/core/repositories/streams' -import { - addStreamAccessRequestDeclinedActivityFactory, - addStreamAccessRequestedActivityFactory -} from '@/modules/activitystream/services/accessRequestActivity' import { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' import { scheduleExecutionFactory } from '@/modules/core/services/taskScheduler' import { @@ -25,18 +21,13 @@ import { releaseTaskLockFactory } from '@/modules/core/repositories/scheduledTasks' import { Knex } from 'knex' -import { - onServerAccessRequestCreatedFactory, - onServerAccessRequestFinalizedFactory, - onServerInviteCreatedFactory -} from '@/modules/activitystream/services/eventListener' +import { onServerInviteCreatedFactory } from '@/modules/activitystream/services/eventListener' import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core' import { publish } from '@/modules/shared/utils/subscriptions' -import { isStreamAccessRequest } from '@/modules/accessrequests/repositories' import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events' import { ProjectEvents } from '@/modules/core/domain/projects/events' -import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' import { reportUserActivityFactory } from '@/modules/activitystream/events/userListeners' +import { reportAccessRequestActivityFactory } from '@/modules/activitystream/events/accessRequestListeners' let scheduledTask: ReturnType | null = null let quitEventListeners: Optional<() => void> = undefined @@ -57,25 +48,14 @@ const initializeEventListeners = ({ eventListen: eventBus.listen, saveActivity }) + const reportAccessRequestActivity = reportAccessRequestActivityFactory({ + eventListen: eventBus.listen, + saveActivity + }) + const quitCbs = [ reportUserActivity(), - eventBus.listen(AccessRequestEvents.Created, async (payload) => { - if (!isStreamAccessRequest(payload.payload.request)) return - return await onServerAccessRequestCreatedFactory({ - addStreamAccessRequestedActivity: addStreamAccessRequestedActivityFactory({ - saveActivity: saveActivityFactory({ db }) - }) - })(payload) - }), - eventBus.listen(AccessRequestEvents.Finalized, async (payload) => { - if (!isStreamAccessRequest(payload.payload.request)) return - await onServerAccessRequestFinalizedFactory({ - addStreamAccessRequestDeclinedActivity: - addStreamAccessRequestDeclinedActivityFactory({ - saveActivity: saveActivityFactory({ db }) - }) - })(payload) - }), + reportAccessRequestActivity(), eventBus.listen(ServerInvitesEvents.Created, async ({ payload }) => { if (!isProjectResourceTarget(payload.invite.resource)) return await onServerInviteCreatedFactory({ diff --git a/packages/server/modules/activitystream/services/accessRequestActivity.ts b/packages/server/modules/activitystream/services/accessRequestActivity.ts deleted file mode 100644 index 3963d7c9a..000000000 --- a/packages/server/modules/activitystream/services/accessRequestActivity.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { - AddStreamAccessRequestedActivity, - SaveActivity -} from '@/modules/activitystream/domain/operations' -import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' - -/** - * Save a "stream access requested" activity - */ -export const addStreamAccessRequestedActivityFactory = - ({ - saveActivity - }: { - saveActivity: SaveActivity - }): AddStreamAccessRequestedActivity => - async (params: { streamId: string; requesterId: string }) => { - const { streamId, requesterId } = params - await saveActivity({ - streamId, - resourceType: ResourceTypes.Stream, - resourceId: streamId, - userId: requesterId, - actionType: ActionTypes.Stream.AccessRequestSent, - message: `User ${requesterId} has requested access to stream ${streamId}`, - info: { requesterId } - }) - } - -/** - * Save a "stream acccess request declined/denied" activity - */ -export const addStreamAccessRequestDeclinedActivityFactory = - ({ saveActivity }: { saveActivity: SaveActivity }) => - async (params: { streamId: string; requesterId: string; declinerId: string }) => { - const { streamId, requesterId, declinerId } = params - await saveActivity({ - streamId, - resourceType: ResourceTypes.Stream, - resourceId: streamId, - userId: declinerId, - actionType: ActionTypes.Stream.AccessRequestDeclined, - message: `User ${declinerId} declined access to stream ${streamId} for user ${requesterId}`, - info: { requesterId, declinerId } - }) - } diff --git a/packages/server/modules/activitystream/services/eventListener.ts b/packages/server/modules/activitystream/services/eventListener.ts index 3dd878b91..fa7595b1b 100644 --- a/packages/server/modules/activitystream/services/eventListener.ts +++ b/packages/server/modules/activitystream/services/eventListener.ts @@ -1,14 +1,5 @@ import { Logger } from '@/logging/logging' -import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' -import { - AccessRequestType, - isStreamAccessRequest -} from '@/modules/accessrequests/repositories' -import { - AddStreamAccessRequestDeclinedActivity, - AddStreamAccessRequestedActivity, - AddStreamInviteSentOutActivity -} from '@/modules/activitystream/domain/operations' +import { AddStreamInviteSentOutActivity } from '@/modules/activitystream/domain/operations' import { GetStream } from '@/modules/core/domain/streams/operations' import { ServerInvitesEvents, @@ -18,52 +9,6 @@ import { isProjectResourceTarget, resolveTarget } from '@/modules/serverinvites/helpers/core' -import { EventPayload } from '@/modules/shared/services/eventBus' - -export const onServerAccessRequestCreatedFactory = - ({ - addStreamAccessRequestedActivity - }: { - addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity - }) => - async (payload: EventPayload) => { - const { - request: { resourceId, requesterId } - } = payload.payload - if (!isStreamAccessRequest(payload.payload.request)) return - if (!resourceId) return - - await addStreamAccessRequestedActivity({ - streamId: resourceId, - requesterId - }) - } - -export const onServerAccessRequestFinalizedFactory = - ({ - addStreamAccessRequestDeclinedActivity - }: { - addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity - }) => - async (payload: EventPayload) => { - const { - approved, - finalizedBy, - request: { resourceId, resourceType, requesterId } - } = payload.payload - if (!resourceId) return - - if (resourceType === AccessRequestType.Stream) { - // If user was added to stream, an activity stream item was already added from 'addOrUpdateStreamCollaborator' - if (approved) return - - await addStreamAccessRequestDeclinedActivity({ - streamId: resourceId, - requesterId, - declinerId: finalizedBy - }) - } - } export const onServerInviteCreatedFactory = ({