Merge branch 'main' into fabians/core-ioc-28

This commit is contained in:
Kristaps Fabians Geikins
2024-10-09 10:38:11 +03:00
3 changed files with 97 additions and 71 deletions
@@ -131,82 +131,98 @@ export async function addStreamDeletedActivity(params: {
/**
* Save "user cloned stream X" activity item
*/
export async function addStreamClonedActivity(
params: {
sourceStreamId: string
newStream: StreamRecord
clonerId: string
},
options?: Partial<{ trx: Knex.Transaction }>
) {
const { trx } = options || {}
const { sourceStreamId, newStream, clonerId } = params
const newStreamId = newStream.id
export const addStreamClonedActivityFactory =
({
saveActivity,
publish
}: {
saveActivity: SaveActivity
publish: PublishSubscription
}) =>
async (
params: {
sourceStreamId: string
newStream: StreamRecord
clonerId: string
},
options?: Partial<{ trx: Knex.Transaction }>
) => {
const { trx } = options || {}
const { sourceStreamId, newStream, clonerId } = params
const newStreamId = newStream.id
const publishSubscriptions = async () =>
publish(UserSubscriptions.UserProjectsUpdated, {
userProjectsUpdated: {
id: newStreamId,
type: UserProjectsUpdatedMessageType.Added,
project: newStream
},
ownerId: clonerId
})
const publishSubscriptions = async () =>
publish(UserSubscriptions.UserProjectsUpdated, {
userProjectsUpdated: {
id: newStreamId,
type: UserProjectsUpdatedMessageType.Added,
project: newStream
},
ownerId: clonerId
})
await Promise.all([
saveActivityFactory({ db })({
streamId: newStreamId,
resourceType: ResourceTypes.Stream,
resourceId: newStreamId,
actionType: ActionTypes.Stream.Clone,
userId: clonerId,
info: { sourceStreamId, newStreamId, clonerId },
message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}`
}),
!trx ? publishSubscriptions() : null
])
await Promise.all([
saveActivity({
streamId: newStreamId,
resourceType: ResourceTypes.Stream,
resourceId: newStreamId,
actionType: ActionTypes.Stream.Clone,
userId: clonerId,
info: { sourceStreamId, newStreamId, clonerId },
message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}`
}),
!trx ? publishSubscriptions() : null
])
if (trx) {
// can't await this, cause it'll block everything
void trx.executionPromise.then(publishSubscriptions)
if (trx) {
// can't await this, cause it'll block everything
void trx.executionPromise.then(publishSubscriptions)
}
}
}
/**
* Save "user created stream" activity item
*/
export async function addStreamCreatedActivity(params: {
streamId: string
creatorId: string
input: StreamCreateInput | ProjectCreateInput
stream: StreamRecord
}) {
const { streamId, creatorId, input, stream } = params
export const addStreamCreatedActivityFactory =
({
saveActivity,
publish
}: {
saveActivity: SaveActivity
publish: PublishSubscription
}) =>
async (params: {
streamId: string
creatorId: string
input: StreamCreateInput | ProjectCreateInput
stream: StreamRecord
}) => {
const { streamId, creatorId, input, stream } = params
await Promise.all([
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
actionType: ActionTypes.Stream.Create,
userId: creatorId,
info: { input },
message: `Stream ${input.name} created`
}),
pubsub.publish(StreamPubsubEvents.UserStreamAdded, {
userStreamAdded: { id: streamId, ...input },
ownerId: creatorId
}),
publish(UserSubscriptions.UserProjectsUpdated, {
userProjectsUpdated: {
id: streamId,
type: UserProjectsUpdatedMessageType.Added,
project: stream
},
ownerId: creatorId
})
])
}
await Promise.all([
saveActivity({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
actionType: ActionTypes.Stream.Create,
userId: creatorId,
info: { input },
message: `Stream ${input.name} created`
}),
publish(StreamPubsubEvents.UserStreamAdded, {
userStreamAdded: { id: streamId, ...input },
ownerId: creatorId
}),
publish(UserSubscriptions.UserProjectsUpdated, {
userProjectsUpdated: {
id: streamId,
type: UserProjectsUpdatedMessageType.Added,
project: stream
},
ownerId: creatorId
})
])
}
/**
* Save "stream permissions granted to user" activity item
@@ -32,7 +32,6 @@ import {
insertCommentsFactory
} from '@/modules/comments/repositories/comments'
import dayjs from 'dayjs'
import { addStreamClonedActivity } from '@/modules/activitystream/services/streamActivity'
import knex, { db } from '@/db/knex'
import { Knex } from 'knex'
import { InsertCommentPayload } from '@/modules/comments/domain/operations'
@@ -41,6 +40,9 @@ import {
getBatchedStreamObjectsFactory,
insertObjectsFactory
} from '@/modules/core/repositories/objects'
import { addStreamClonedActivityFactory } from '@/modules/activitystream/services/streamActivity'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { publish } from '@/modules/shared/utils/subscriptions'
type CloneStreamInitialState = {
user: UserWithOptionalRole<UserRecord>
@@ -429,7 +431,10 @@ export async function cloneStream(userId: string, sourceStreamId: string) {
// Clone comments
await cloneStreamComments(state, coreCloneResult)
// Create activity item
await addStreamClonedActivity(
await addStreamClonedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})(
{
sourceStreamId,
newStream,
@@ -1,6 +1,6 @@
import { MaybeNullOrUndefined, Roles, wait } from '@speckle/shared'
import {
addStreamCreatedActivity,
addStreamCreatedActivityFactory,
addStreamDeletedActivity,
addStreamUpdatedActivity
} from '@/modules/activitystream/services/streamActivity'
@@ -55,6 +55,8 @@ import { buildCoreInviteEmailContentsFactory } from '@/modules/serverinvites/ser
import { getEventBus } from '@/modules/shared/services/eventBus'
import { ProjectInviteResourceType } from '@/modules/serverinvites/domain/constants'
import { createBranchFactory } from '@/modules/core/repositories/branches'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { publish } from '@/modules/shared/utils/subscriptions'
export async function createStreamReturnRecord(
params: (StreamCreateInput | ProjectCreateInput) & {
@@ -112,7 +114,10 @@ export async function createStreamReturnRecord(
// Save activity
if (createActivity) {
await addStreamCreatedActivity({
await addStreamCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})({
streamId,
input: params,
stream,