chore(activitystream): refactor saveActivity

This commit is contained in:
Alessandro Magionami
2024-09-26 10:34:27 +02:00
parent b887911b56
commit 07a6778fb0
14 changed files with 143 additions and 150 deletions
@@ -133,3 +133,5 @@ export type GetUserActivity = ({
cursor: string | null
items: StreamActivityRecord[]
}>
export type SaveActivity = (args: Omit<StreamActivityRecord, 'time'>) => Promise<void>
@@ -5,7 +5,7 @@ export type StreamActivityRecord = {
time: Date
resourceType: Nullable<(typeof ResourceTypes)[keyof typeof ResourceTypes]>
resourceId: Nullable<string>
actionType: Nullable<AllActivityTypes>
actionType: AllActivityTypes
userId: Nullable<string>
info: Nullable<Record<string, unknown>>
message: Nullable<string>
@@ -8,7 +8,8 @@ import {
GetStreamActivity,
GetTimelineCount,
GetUserActivity,
GetUserTimeline
GetUserTimeline,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import {
StreamActivityRecord,
@@ -17,6 +18,11 @@ import {
import { StreamAcl, StreamActivity } from '@/modules/core/dbSchema'
import { Roles } from '@/modules/core/helpers/mainConstants'
import { StreamAclRecord } from '@/modules/core/helpers/types'
import { getStream } from '@/modules/core/repositories/streams'
import { getServerInfo } from '@/modules/core/services/generic'
import { getUser } from '@/modules/core/repositories/users'
import { createWebhookEventFactory } from '@/modules/webhooks/repositories/webhooks'
import { dispatchStreamEventFactory } from '@/modules/webhooks/services/webhooks'
import { Knex } from 'knex'
const tables = {
@@ -213,3 +219,47 @@ export const getUserActivityFactory =
cursor: results.length > 0 ? results[results.length - 1].time.toISOString() : null
}
}
// TODO: this function should be a service
export const saveActivityFactory =
({ db }: { db: Knex }): SaveActivity =>
async ({ streamId, resourceType, resourceId, actionType, userId, info, message }) => {
const dbObject = {
streamId, // abc
resourceType, // "commit"
resourceId, // commit id
actionType, // "commit_receive"
userId, // populated by the api
info: JSON.stringify(info), // can be anything with conventions! (TBD)
message // something human understandable for frontend purposes mostly
}
await tables
.streamActivity<Omit<StreamActivityRecord, 'info'> & { info: string }>(db)
.insert(dbObject)
if (streamId) {
const webhooksPayload = {
streamId,
userId,
activityMessage: message,
event: {
// eslint-disable-next-line camelcase
event_name: actionType,
data: info
}
}
await dispatchStreamEventFactory({
db,
getServerInfo,
getStream,
createWebhookEvent: createWebhookEventFactory({ db }),
getUser
})({
streamId,
event: actionType,
eventPayload: webhooksPayload
})
}
}
@@ -1,6 +1,6 @@
import { saveActivity } from '@/modules/activitystream/services'
import { db } from '@/db/knex'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
/**
* Save a "stream access requested" activity
@@ -10,7 +10,7 @@ export async function addStreamAccessRequestedActivity(params: {
requesterId: string
}) {
const { streamId, requesterId } = params
await saveActivity({
await saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -30,7 +30,7 @@ export async function addStreamAccessRequestDeclinedActivity(params: {
declinerId: string
}) {
const { streamId, requesterId, declinerId } = params
await saveActivity({
await saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -1,4 +1,3 @@
import { saveActivity } from '@/modules/activitystream/services'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { BranchRecord } from '@/modules/core/helpers/types'
import {
@@ -14,6 +13,8 @@ import {
} from '@/modules/core/graph/generated/graphql'
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import { isBranchDeleteInput, isBranchUpdateInput } from '@/modules/core/helpers/branch'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
/**
* Save "branch created" activity
@@ -22,7 +23,7 @@ export async function addBranchCreatedActivity(params: { branch: BranchRecord })
const { branch } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId: branch.streamId,
resourceType: ResourceTypes.Branch,
resourceId: branch.id,
@@ -56,7 +57,7 @@ export async function addBranchUpdatedActivity(params: {
const streamId = isBranchUpdateInput(update) ? update.streamId : update.projectId
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Branch,
resourceId: update.id,
@@ -90,7 +91,7 @@ export async function addBranchDeletedActivity(params: {
const streamId = isBranchDeleteInput(input) ? input.streamId : input.projectId
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Branch,
resourceId: input.id,
@@ -1,6 +1,6 @@
import { db } from '@/db/knex'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { saveActivity } from '@/modules/activitystream/services'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { ViewerResourceItem } from '@/modules/comments/domain/types'
import { CommentRecord } from '@/modules/comments/helpers/types'
import { getCommentsResourcesFactory } from '@/modules/comments/repositories/comments'
@@ -96,7 +96,7 @@ export async function addCommentCreatedActivity(params: {
}
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
resourceId: comment.id,
streamId,
resourceType: ResourceTypes.Comment,
@@ -156,7 +156,7 @@ export async function addCommentArchivedActivity(params: {
})
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Comment,
resourceId: commentId,
@@ -221,7 +221,7 @@ export async function addReplyAddedActivity(params: {
? input.parentComment
: input.threadId
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Comment,
resourceId: parentCommentId,
@@ -1,4 +1,3 @@
import { saveActivity } from '@/modules/activitystream/services'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import {
CommitSubscriptions as CommitPubsubEvents,
@@ -14,6 +13,8 @@ import {
import { CommitRecord } from '@/modules/core/helpers/types'
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
import { has } from 'lodash'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
/**
* Save "new commit created" activity item
@@ -29,7 +30,7 @@ export async function addCommitCreatedActivity(params: {
}) {
const { commitId, input, streamId, userId, branchName, commit } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Commit,
resourceId: commitId,
@@ -84,7 +85,7 @@ export async function addCommitUpdatedActivity(params: {
}
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Commit,
resourceId: commitId,
@@ -120,7 +121,7 @@ export async function addCommitMovedActivity(params: {
}) {
const { commitId, streamId, userId, originalBranchId, newBranchId, commit } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Commit,
resourceId: commitId,
@@ -150,7 +151,7 @@ export async function addCommitDeletedActivity(params: {
}) {
const { commitId, streamId, userId, commit, branchId } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Commit,
resourceId: commitId,
@@ -181,7 +182,7 @@ export async function addCommitReceivedActivity(params: {
}) {
const { input, userId } = params
await saveActivity({
await saveActivityFactory({ db })({
streamId: input.streamId,
resourceType: ResourceTypes.Commit,
resourceId: input.commitId,
@@ -1,10 +1,11 @@
import { db } from '@/db/knex'
import {
AccessRequestsEmitter,
AccessRequestsEvents,
AccessRequestsEventsPayloads
} from '@/modules/accessrequests/events/emitter'
import { AccessRequestType } from '@/modules/accessrequests/repositories'
import { saveActivity } from '@/modules/activitystream/services'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import {
addStreamAccessRequestDeclinedActivity,
addStreamAccessRequestedActivity
@@ -18,7 +19,7 @@ import {
async function onUserCreated(payload: UsersEventsPayloads[UsersEvents.Created]) {
const { user } = payload
await saveActivity({
await saveActivityFactory({ db })({
streamId: null,
resourceType: 'user',
resourceId: user.id,
@@ -1,65 +0,0 @@
'use strict'
const knex = require('@/db/knex')
const { dispatchStreamEventFactory } = require('@/modules/webhooks/services/webhooks')
const { getStream } = require('@/modules/core/repositories/streams')
const {
createWebhookEventFactory
} = require('@/modules/webhooks/repositories/webhooks')
const { getUser } = require('@/modules/core/repositories/users')
const { getServerInfo } = require('@/modules/core/services/generic')
const StreamActivity = () => knex('stream_activity')
module.exports = {
/**
* @param {Omit<import('@/modules/activitystream/helpers/types').StreamActivityRecord, "time">} param0
* @param {{trx?: import('knex').Knex.Transaction}} param1
*/
async saveActivity(
{ streamId, resourceType, resourceId, actionType, userId, info, message },
{ trx } = {}
) {
const dbObject = {
streamId, // abc
resourceType, // "commit"
resourceId, // commit id
actionType, // "commit_receive"
userId, // populated by the api
info: JSON.stringify(info), // can be anything with conventions! (TBD)
message // something human understandable for frontend purposes mostly
}
const q = StreamActivity().insert(dbObject)
if (trx) q.transacting(trx)
await q
if (streamId) {
const webhooksPayload = {
streamId,
userId,
activityMessage: message,
event: {
// eslint-disable-next-line camelcase
event_name: actionType,
data: info
}
}
await dispatchStreamEventFactory({
db: trx ?? knex.db,
getServerInfo,
getStream,
createWebhookEvent: createWebhookEventFactory({ db: knex.db }),
getUser
})(
{
streamId,
event: actionType,
eventPayload: webhooksPayload
},
{ trx }
)
}
}
}
@@ -1,4 +1,3 @@
import { saveActivity } from '@/modules/activitystream/services'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { StreamRoles } from '@/modules/core/helpers/mainConstants'
import {
@@ -22,6 +21,8 @@ import {
publish,
UserSubscriptions
} from '@/modules/shared/utils/subscriptions'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
/**
* Save "stream updated" activity
@@ -36,7 +37,7 @@ export async function addStreamUpdatedActivity(params: {
const { streamId, updaterId, oldStream, update, newStream } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -109,7 +110,7 @@ export async function addStreamDeletedActivity(params: {
)
}
await saveActivity({
await saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -146,18 +147,15 @@ export async function addStreamClonedActivity(
})
await Promise.all([
saveActivity(
{
streamId: newStreamId,
resourceType: ResourceTypes.Stream,
resourceId: newStreamId,
actionType: ActionTypes.Stream.Clone,
userId: clonerId,
info: { sourceStreamId, newStreamId, clonerId },
message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}`
},
{ trx }
),
saveActivityFactory({ db })({
streamId: newStreamId,
resourceType: ResourceTypes.Stream,
resourceId: newStreamId,
actionType: ActionTypes.Stream.Clone,
userId: clonerId,
info: { sourceStreamId, newStreamId, clonerId },
message: `User ${clonerId} cloned stream ${sourceStreamId} as ${newStreamId}`
}),
!trx ? publishSubscriptions() : null
])
@@ -179,7 +177,7 @@ export async function addStreamCreatedActivity(params: {
const { streamId, creatorId, input, stream } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -215,7 +213,7 @@ export async function addStreamPermissionsAddedActivity(params: {
}) {
const { streamId, activityUserId, targetUserId, role, stream } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -261,7 +259,7 @@ export async function addStreamInviteAcceptedActivity(params: {
}) {
const { streamId, inviteTargetId, inviterId, role, stream } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -308,7 +306,7 @@ export async function addStreamPermissionsRevokedActivity(params: {
const isVoluntaryLeave = activityUserId === removedUserId
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -358,7 +356,7 @@ export async function addStreamInviteSentOutActivity(params: {
const targetDisplay = inviteTargetId || inviteTargetEmail
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -388,7 +386,7 @@ export async function addStreamInviteDeclinedActivity(params: {
}) {
const { streamId, inviteTargetId, inviterId, stream } = params
await Promise.all([
saveActivity({
saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Stream,
resourceId: streamId,
@@ -418,7 +416,7 @@ export async function addStreamCommentMentionActivity(params: {
threadId: string
}) {
const { streamId, mentionAuthorId, mentionTargetId, commentId, threadId } = params
await saveActivity({
await saveActivityFactory({ db })({
streamId,
resourceType: ResourceTypes.Comment,
resourceId: commentId,
@@ -1,7 +1,8 @@
import { UserUpdateInput } from '@/modules/core/graph/generated/graphql'
import { UserRecord } from '@/modules/core/helpers/types'
import { saveActivity } from '@/modules/activitystream/services'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
export async function addUserUpdatedActivity(params: {
oldUser: UserRecord
@@ -10,7 +11,7 @@ export async function addUserUpdatedActivity(params: {
}) {
const { oldUser, update, updaterId } = params
await saveActivity({
await saveActivityFactory({ db })({
streamId: null,
resourceType: ResourceTypes.User,
resourceId: oldUser.id,
@@ -7,7 +7,6 @@ import {
} from '@/modules/activitystream/services/summary'
import { expect } from 'chai'
import { createStream, deleteStream } from '@/modules/core/services/streams'
import { saveActivity } from '@/modules/activitystream/services'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import {
ActivityDigestMessage,
@@ -15,11 +14,15 @@ import {
NotificationTypeMessageMap
} from '@/modules/notifications/helpers/types'
import { sleep } from '@/test/helpers'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { db } from '@/db/knex'
const cleanup = async () => {
await truncateTables([StreamActivity.name, Users.name])
}
const saveActivity = saveActivityFactory({ db })
describe('Activity summary @activity', () => {
const userA: BasicTestUser = {
name: 'd1',
@@ -7,7 +7,6 @@ const {
changeUserRole
} = require('@/modules/core/services/users')
const { updateUserAndNotify } = require('@/modules/core/services/users/management')
const { saveActivity } = require('@/modules/activitystream/services')
const { ActionTypes } = require('@/modules/activitystream/helpers/types')
const { validateScopes } = require(`@/modules/shared`)
const zxcvbn = require('zxcvbn')
@@ -27,6 +26,7 @@ const {
} = require('@/modules/serverinvites/repositories/serverInvites')
const db = require('@/db/knex')
const { BadRequestError } = require('@/modules/shared/errors')
const { saveActivityFactory } = require('@/modules/activitystream/repositories')
/** @type {import('@/modules/core/graph/generated/graphql').Resolvers} */
module.exports = {
@@ -185,7 +185,7 @@ module.exports = {
deleteAllUserInvites: deleteAllUserInvitesFactory({ db })
})(context.userId, args.user)
await saveActivity({
await saveActivityFactory({ db })({
streamId: null,
resourceType: 'user',
resourceId: context.userId,
@@ -20,6 +20,7 @@ import {
UserWithOptionalRole
} from '@/modules/core/repositories/users'
import { Knex } from 'knex'
import { ServerInfo } from '@/modules/core/helpers/types'
const MAX_STREAM_WEBHOOKS = 100
@@ -113,47 +114,48 @@ export const dispatchStreamEventFactory =
createWebhookEvent: CreateWebhookEvent
getUser: typeof getUserFn
}) =>
async (
{
streamId,
event,
eventPayload
}: {
streamId: string
event: string
eventPayload: {
server: { id?: number; canonicalUrl?: string }
streamId?: string
stream?: StreamWithOptionalRole
userId?: string
user: Partial<UserWithOptionalRole> | null
webhook: Webhook
}
},
{ trx }: { trx?: Knex.Transaction } = {}
) => {
async ({
streamId,
event,
eventPayload
}: {
streamId: string
event: string
eventPayload: {
streamId?: string
stream?: StreamWithOptionalRole
userId?: string | null
user?: Partial<UserWithOptionalRole> | null
}
}) => {
const payload: typeof eventPayload & {
server: Partial<Omit<ServerInfo, 'secret'>>
} = {
...eventPayload,
server: { ...(await getServerInfo()), canonicalUrl: process.env.CANONICAL_URL }
}
// Add server info
eventPayload.server = await getServerInfo()
eventPayload.server.canonicalUrl = process.env.CANONICAL_URL
delete eventPayload.server.id
payload.server = await getServerInfo()
payload.server.canonicalUrl = process.env.CANONICAL_URL
delete payload.server.id
// Add stream info
if (eventPayload.streamId) {
eventPayload.stream = await getStream(
if (payload.streamId) {
payload.stream = await getStream(
{
streamId: eventPayload.streamId,
userId: eventPayload.userId
streamId: payload.streamId,
userId: payload.userId ?? undefined
},
{ trx }
{ trx: db.isTransaction ? await db.transaction() : undefined }
)
}
// Add user info (except email and pwd)
if (eventPayload.userId) {
eventPayload.user = await getUser(eventPayload.userId)
if (eventPayload.user) {
delete eventPayload.user.passwordDigest
delete eventPayload.user.email
if (payload.userId) {
payload.user = await getUser(payload.userId)
if (payload.user) {
delete payload.user.passwordDigest
delete payload.user.email
}
}
@@ -170,14 +172,13 @@ export const dispatchStreamEventFactory =
if (!(event in wh.triggers)) continue
// Add webhook info (the key `webhook` will be replaced for each webhook configured, before serializing the payload and storing it)
eventPayload.webhook = wh
eventPayload.webhook.triggers = Object.keys(eventPayload.webhook.triggers)
delete eventPayload.webhook.secret
wh.triggers = Object.keys(wh.triggers)
delete wh.secret
await createWebhookEvent({
id: crs({ length: 20 }),
webhookId: wh.id,
payload: JSON.stringify(eventPayload)
payload: JSON.stringify({ ...payload, webhook: wh })
})
}
}