From c2c95d20c21c71dfd6922c0484b05a660531db17 Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Wed, 9 Oct 2024 09:28:17 +0200 Subject: [PATCH 1/2] Activitystream IoC 1 addStreamCreatedActivity (#3206) * chore(activitystream): addStreamPermissionsAddedActivity refactor multi region * chore(activitystream): addStreamCreatedActivity refactor multi region --- .../activitystream/services/streamActivity.ts | 70 +++++++++++-------- .../core/services/streams/management.ts | 9 ++- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/packages/server/modules/activitystream/services/streamActivity.ts b/packages/server/modules/activitystream/services/streamActivity.ts index f31223eb9..4d240c2dc 100644 --- a/packages/server/modules/activitystream/services/streamActivity.ts +++ b/packages/server/modules/activitystream/services/streamActivity.ts @@ -175,38 +175,46 @@ export async function addStreamClonedActivity( /** * Save "user created stream" activity item */ -export async function addStreamCreatedActivity(params: { - streamId: string - creatorId: string - input: StreamCreateInput | ProjectCreateInput - stream: StreamRecord -}) { - const { streamId, creatorId, input, stream } = params +export const addStreamCreatedActivityFactory = + ({ + saveActivity, + publish + }: { + saveActivity: SaveActivity + publish: PublishSubscription + }) => + async (params: { + streamId: string + creatorId: string + input: StreamCreateInput | ProjectCreateInput + stream: StreamRecord + }) => { + const { streamId, creatorId, input, stream } = params - await Promise.all([ - saveActivityFactory({ db })({ - streamId, - resourceType: ResourceTypes.Stream, - resourceId: streamId, - actionType: ActionTypes.Stream.Create, - userId: creatorId, - info: { input }, - message: `Stream ${input.name} created` - }), - pubsub.publish(StreamPubsubEvents.UserStreamAdded, { - userStreamAdded: { id: streamId, ...input }, - ownerId: creatorId - }), - publish(UserSubscriptions.UserProjectsUpdated, { - userProjectsUpdated: { - id: streamId, - type: UserProjectsUpdatedMessageType.Added, - project: stream - }, - ownerId: creatorId - }) - ]) -} + await Promise.all([ + saveActivity({ + streamId, + resourceType: ResourceTypes.Stream, + resourceId: streamId, + actionType: ActionTypes.Stream.Create, + userId: creatorId, + info: { input }, + message: `Stream ${input.name} created` + }), + publish(StreamPubsubEvents.UserStreamAdded, { + userStreamAdded: { id: streamId, ...input }, + ownerId: creatorId + }), + publish(UserSubscriptions.UserProjectsUpdated, { + userProjectsUpdated: { + id: streamId, + type: UserProjectsUpdatedMessageType.Added, + project: stream + }, + ownerId: creatorId + }) + ]) + } /** * Save "stream permissions granted to user" activity item diff --git a/packages/server/modules/core/services/streams/management.ts b/packages/server/modules/core/services/streams/management.ts index 117ed123b..bef1de143 100644 --- a/packages/server/modules/core/services/streams/management.ts +++ b/packages/server/modules/core/services/streams/management.ts @@ -1,6 +1,6 @@ import { MaybeNullOrUndefined, Roles, wait } from '@speckle/shared' import { - addStreamCreatedActivity, + addStreamCreatedActivityFactory, addStreamDeletedActivity, addStreamUpdatedActivity } from '@/modules/activitystream/services/streamActivity' @@ -55,6 +55,8 @@ import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/ser import { getEventBus } from '@/modules/shared/services/eventBus' import { ProjectInviteResourceType } from '@/modules/serverinvites/domain/constants' import { createBranchFactory } from '@/modules/core/repositories/branches' +import { saveActivityFactory } from '@/modules/activitystream/repositories' +import { publish } from '@/modules/shared/utils/subscriptions' export async function createStreamReturnRecord( params: (StreamCreateInput | ProjectCreateInput) & { @@ -112,7 +114,10 @@ export async function createStreamReturnRecord( // Save activity if (createActivity) { - await addStreamCreatedActivity({ + await addStreamCreatedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + })({ streamId, input: params, stream, From a3fb0d7c0edc1a04b75a7eed153fbc70a5df2120 Mon Sep 17 00:00:00 2001 From: Alessandro Magionami Date: Wed, 9 Oct 2024 09:29:24 +0200 Subject: [PATCH 2/2] Activitystream IoC 2 addStreamClonedActivity (#3207) * chore(activitystream): addStreamPermissionsAddedActivity refactor multi region * chore(activitystream): addStreamCreatedActivity refactor multi region * chore(activitystream): addStreamClonedActivity refactor multi region --- .../activitystream/services/streamActivity.ts | 80 ++++++++++--------- .../modules/core/services/streams/clone.ts | 9 ++- 2 files changed, 51 insertions(+), 38 deletions(-) diff --git a/packages/server/modules/activitystream/services/streamActivity.ts b/packages/server/modules/activitystream/services/streamActivity.ts index 4d240c2dc..1e3ca7511 100644 --- a/packages/server/modules/activitystream/services/streamActivity.ts +++ b/packages/server/modules/activitystream/services/streamActivity.ts @@ -131,46 +131,54 @@ export async function addStreamDeletedActivity(params: { /** * Save "user cloned stream X" activity item */ -export async function addStreamClonedActivity( - params: { - sourceStreamId: string - newStream: StreamRecord - clonerId: string - }, - options?: Partial<{ trx: Knex.Transaction }> -) { - const { trx } = options || {} - const { sourceStreamId, newStream, clonerId } = params - const newStreamId = newStream.id +export const addStreamClonedActivityFactory = + ({ + saveActivity, + publish + }: { + saveActivity: SaveActivity + publish: PublishSubscription + }) => + async ( + params: { + sourceStreamId: string + newStream: StreamRecord + clonerId: string + }, + options?: Partial<{ trx: Knex.Transaction }> + ) => { + const { trx } = options || {} + const { sourceStreamId, newStream, clonerId } = params + const newStreamId = newStream.id - const publishSubscriptions = async () => - publish(UserSubscriptions.UserProjectsUpdated, { - userProjectsUpdated: { - id: newStreamId, - type: UserProjectsUpdatedMessageType.Added, - project: newStream - }, - ownerId: clonerId - }) + const publishSubscriptions = async () => + publish(UserSubscriptions.UserProjectsUpdated, { + userProjectsUpdated: { + id: newStreamId, + type: UserProjectsUpdatedMessageType.Added, + project: newStream + }, + ownerId: clonerId + }) - await Promise.all([ - saveActivityFactory({ db })({ - streamId: newStreamId, - resourceType: ResourceTypes.Stream, - resourceId: newStreamId, - actionType: ActionTypes.Stream.Clone, - userId: clonerId, - info: { sourceStreamId, newStreamId, clonerId }, - message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}` - }), - !trx ? publishSubscriptions() : null - ]) + await Promise.all([ + saveActivity({ + streamId: newStreamId, + resourceType: ResourceTypes.Stream, + resourceId: newStreamId, + actionType: ActionTypes.Stream.Clone, + userId: clonerId, + info: { sourceStreamId, newStreamId, clonerId }, + message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}` + }), + !trx ? publishSubscriptions() : null + ]) - if (trx) { - // can't await this, cause it'll block everything - void trx.executionPromise.then(publishSubscriptions) + if (trx) { + // can't await this, cause it'll block everything + void trx.executionPromise.then(publishSubscriptions) + } } -} /** * Save "user created stream" activity item diff --git a/packages/server/modules/core/services/streams/clone.ts b/packages/server/modules/core/services/streams/clone.ts index 72bf21301..d86dcf05c 100644 --- a/packages/server/modules/core/services/streams/clone.ts +++ b/packages/server/modules/core/services/streams/clone.ts @@ -36,11 +36,13 @@ import { insertCommentsFactory } from '@/modules/comments/repositories/comments' import dayjs from 'dayjs' -import { addStreamClonedActivity } from '@/modules/activitystream/services/streamActivity' import knex, { db } from '@/db/knex' import { Knex } from 'knex' import { InsertCommentPayload } from '@/modules/comments/domain/operations' import { SmartTextEditorValueSchema } from '@/modules/core/services/richTextEditorService' +import { addStreamClonedActivityFactory } from '@/modules/activitystream/services/streamActivity' +import { saveActivityFactory } from '@/modules/activitystream/repositories' +import { publish } from '@/modules/shared/utils/subscriptions' type CloneStreamInitialState = { user: UserWithOptionalRole @@ -426,7 +428,10 @@ export async function cloneStream(userId: string, sourceStreamId: string) { // Clone comments await cloneStreamComments(state, coreCloneResult) // Create activity item - await addStreamClonedActivity( + await addStreamClonedActivityFactory({ + saveActivity: saveActivityFactory({ db }), + publish + })( { sourceStreamId, newStream,