Merge pull request #3010 from specklesystems/alessandro/web-957-dispatch-webhook-event
Alessandro/web 957 dispatch webhook event
This commit is contained in:
@@ -2,7 +2,13 @@
|
||||
|
||||
const knex = require('@/db/knex')
|
||||
|
||||
const { dispatchStreamEvent } = require('../../webhooks/services/webhooks')
|
||||
const { dispatchStreamEventFactory } = require('@/modules/webhooks/services/webhooks')
|
||||
const { getStream } = require('@/modules/core/repositories/streams')
|
||||
const {
|
||||
createWebhookEventFactory
|
||||
} = require('@/modules/webhooks/repositories/webhooks')
|
||||
const { getUser } = require('@/modules/core/repositories/users')
|
||||
const { getServerInfo } = require('@/modules/core/services/generic')
|
||||
const StreamActivity = () => knex('stream_activity')
|
||||
const StreamAcl = () => knex('stream_acl')
|
||||
|
||||
@@ -41,7 +47,13 @@ module.exports = {
|
||||
}
|
||||
}
|
||||
|
||||
await dispatchStreamEvent(
|
||||
await dispatchStreamEventFactory({
|
||||
db: trx ?? knex.db,
|
||||
getServerInfo,
|
||||
getStream,
|
||||
createWebhookEvent: createWebhookEventFactory({ db: knex.db }),
|
||||
getUser
|
||||
})(
|
||||
{
|
||||
streamId,
|
||||
event: actionType,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Webhook } from '@/modules/webhooks/domain/types'
|
||||
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'
|
||||
|
||||
export type CreateWebhook = (
|
||||
export type CreateWebhookConfig = (
|
||||
webhook: Pick<
|
||||
Webhook,
|
||||
'id' | 'streamId' | 'url' | 'description' | 'secret' | 'enabled' | 'triggers'
|
||||
@@ -13,7 +13,7 @@ export type CountWebhooksByStreamId = ({
|
||||
|
||||
export type GetWebhookById = ({ id }: Pick<Webhook, 'id'>) => Promise<Webhook | null>
|
||||
|
||||
export type UpdateWebhook = ({
|
||||
export type UpdateWebhookConfig = ({
|
||||
webhookId,
|
||||
webhookInput
|
||||
}: {
|
||||
@@ -27,8 +27,26 @@ export type UpdateWebhook = ({
|
||||
>
|
||||
}) => Promise<string>
|
||||
|
||||
export type DeleteWebhook = ({ id }: Pick<Webhook, 'id'>) => Promise<number>
|
||||
export type DeleteWebhookConfig = ({ id }: Pick<Webhook, 'id'>) => Promise<number>
|
||||
|
||||
export type GetStreamWebhooks = ({
|
||||
streamId
|
||||
}: Pick<Webhook, 'streamId'>) => Promise<Webhook[]>
|
||||
|
||||
export type CreateWebhookEvent = (
|
||||
event: Pick<WebhookEvent, 'id' | 'payload' | 'webhookId'>
|
||||
) => Promise<string>
|
||||
|
||||
export type GetLastWebhookEvents = ({
|
||||
webhookId,
|
||||
limit
|
||||
}: {
|
||||
webhookId: string
|
||||
limit?: number
|
||||
}) => Promise<WebhookEvent[]>
|
||||
|
||||
export type GetWebhookEventsCount = ({
|
||||
webhookId
|
||||
}: {
|
||||
webhookId: string
|
||||
}) => Promise<number>
|
||||
|
||||
@@ -9,3 +9,13 @@ export type Webhook = {
|
||||
createdAt: Date
|
||||
updatedAt: Date
|
||||
}
|
||||
|
||||
export type WebhookEvent = {
|
||||
id: string
|
||||
webhookId: string
|
||||
status: number
|
||||
statusInfo: string
|
||||
retryCount: number
|
||||
lastUpdate: Date
|
||||
payload: string
|
||||
}
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
const {
|
||||
getLastWebhookEvents,
|
||||
getWebhookEventsCount
|
||||
} = require('../../services/webhooks')
|
||||
|
||||
module.exports = {
|
||||
Webhook: {
|
||||
async history(parent, args) {
|
||||
const items = await getLastWebhookEvents({
|
||||
webhookId: parent.id,
|
||||
limit: args.limit
|
||||
})
|
||||
const totalCount = await getWebhookEventsCount({ webhookId: parent.id })
|
||||
|
||||
return { items, totalCount }
|
||||
}
|
||||
}
|
||||
}
|
||||
+27
-14
@@ -1,18 +1,20 @@
|
||||
import { Resolvers } from '@/modules/core/graph/generated/graphql'
|
||||
import { authorizeResolver } from '@/modules/shared'
|
||||
import {
|
||||
createWebhook,
|
||||
deleteWebhook,
|
||||
updateWebhook
|
||||
} from '@/modules/webhooks/services/webhooks-new'
|
||||
createWebhookFactory,
|
||||
deleteWebhookFactory,
|
||||
updateWebhookFactory
|
||||
} from '@/modules/webhooks/services/webhooks'
|
||||
import { Roles } from '@speckle/shared'
|
||||
import {
|
||||
countWebhooksByStreamIdFactory,
|
||||
createWebhookFactory,
|
||||
deleteWebhookFactory,
|
||||
createWebhookConfigFactory,
|
||||
deleteWebhookConfigFactory,
|
||||
getLastWebhookEventsFactory,
|
||||
getStreamWebhooksFactory,
|
||||
getWebhookByIdFactory,
|
||||
updateWebhookFactory
|
||||
getWebhookEventsCountFactory,
|
||||
updateWebhookConfigFactory
|
||||
} from '@/modules/webhooks/repositories/webhooks'
|
||||
import { db } from '@/db/knex'
|
||||
import { ForbiddenError } from '@/modules/shared/errors'
|
||||
@@ -43,7 +45,18 @@ const streamWebhooksResolver = async (
|
||||
export = {
|
||||
Webhook: {
|
||||
projectId: (parent) => parent.streamId,
|
||||
hasSecret: (parent) => !!parent.secret?.length
|
||||
hasSecret: (parent) => !!parent.secret?.length,
|
||||
history: async (parent, args) => {
|
||||
const items = await getLastWebhookEventsFactory({ db })({
|
||||
webhookId: parent.id,
|
||||
limit: args.limit
|
||||
})
|
||||
const totalCount = await getWebhookEventsCountFactory({ db })({
|
||||
webhookId: parent.id
|
||||
})
|
||||
|
||||
return { items, totalCount }
|
||||
}
|
||||
},
|
||||
Stream: {
|
||||
webhooks: streamWebhooksResolver
|
||||
@@ -60,8 +73,8 @@ export = {
|
||||
context.resourceAccessRules
|
||||
)
|
||||
|
||||
const id = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
const id = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})({
|
||||
streamId: args.webhook.streamId,
|
||||
@@ -88,8 +101,8 @@ export = {
|
||||
'The webhook id and stream id do not match. Please check your inputs.'
|
||||
)
|
||||
|
||||
const updated = await updateWebhook({
|
||||
updateWebhookConfig: updateWebhookFactory({ db })
|
||||
const updated = await updateWebhookFactory({
|
||||
updateWebhookConfig: updateWebhookConfigFactory({ db })
|
||||
})({
|
||||
id: args.webhook.id,
|
||||
url: args.webhook.url,
|
||||
@@ -109,8 +122,8 @@ export = {
|
||||
context.resourceAccessRules
|
||||
)
|
||||
|
||||
return await deleteWebhook({
|
||||
deleteWebhookConfig: deleteWebhookFactory({ db }),
|
||||
return await deleteWebhookFactory({
|
||||
deleteWebhookConfig: deleteWebhookConfigFactory({ db }),
|
||||
getWebhookById: getWebhookByIdFactory({ db })
|
||||
})(args.webhook)
|
||||
}
|
||||
@@ -1,19 +1,22 @@
|
||||
import { Knex } from 'knex'
|
||||
import { Webhook } from '@/modules/webhooks/domain/types'
|
||||
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'
|
||||
import {
|
||||
CountWebhooksByStreamId,
|
||||
CreateWebhook,
|
||||
DeleteWebhook,
|
||||
CreateWebhookConfig,
|
||||
CreateWebhookEvent,
|
||||
DeleteWebhookConfig,
|
||||
GetLastWebhookEvents,
|
||||
GetStreamWebhooks,
|
||||
GetWebhookById,
|
||||
UpdateWebhook
|
||||
GetWebhookEventsCount,
|
||||
UpdateWebhookConfig
|
||||
} from '@/modules/webhooks/domain/operations'
|
||||
|
||||
type WebhookConfig = Omit<Webhook, 'triggers'> & { triggers: Record<string, true> }
|
||||
|
||||
const tables = (db: Knex) => ({
|
||||
webhooksConfigs: db<WebhookConfig>('webhooks_config'),
|
||||
webhooksEvents: db('webhooks_events')
|
||||
webhooksEvents: db<WebhookEvent>('webhooks_events')
|
||||
})
|
||||
|
||||
const toTriggersObj = (triggers: string[]): Record<string, true> =>
|
||||
@@ -22,8 +25,8 @@ const toTriggersObj = (triggers: string[]): Record<string, true> =>
|
||||
const toTriggersArray = (triggers: Record<string, true>): string[] =>
|
||||
Object.keys(triggers)
|
||||
|
||||
export const createWebhookFactory =
|
||||
({ db }: { db: Knex }): CreateWebhook =>
|
||||
export const createWebhookConfigFactory =
|
||||
({ db }: { db: Knex }): CreateWebhookConfig =>
|
||||
async ({ id, streamId, url, description, secret, enabled, triggers }) => {
|
||||
const triggersObj = toTriggersObj(triggers)
|
||||
|
||||
@@ -59,8 +62,8 @@ export const getWebhookByIdFactory =
|
||||
return { ...webhook, triggers: toTriggersArray(webhook.triggers) }
|
||||
}
|
||||
|
||||
export const updateWebhookFactory =
|
||||
({ db }: { db: Knex }): UpdateWebhook =>
|
||||
export const updateWebhookConfigFactory =
|
||||
({ db }: { db: Knex }): UpdateWebhookConfig =>
|
||||
async ({ webhookId, webhookInput }) => {
|
||||
const { triggers, ...update } = webhookInput
|
||||
let triggersObj: Record<string, true> | undefined
|
||||
@@ -75,8 +78,8 @@ export const updateWebhookFactory =
|
||||
return webhookId
|
||||
}
|
||||
|
||||
export const deleteWebhookFactory =
|
||||
({ db }: { db: Knex }): DeleteWebhook =>
|
||||
export const deleteWebhookConfigFactory =
|
||||
({ db }: { db: Knex }): DeleteWebhookConfig =>
|
||||
async ({ id }) => {
|
||||
return await tables(db).webhooksConfigs.where({ id }).del()
|
||||
}
|
||||
@@ -94,3 +97,31 @@ export const getStreamWebhooksFactory =
|
||||
triggers: toTriggersArray(webhook.triggers)
|
||||
}))
|
||||
}
|
||||
|
||||
export const createWebhookEventFactory =
|
||||
({ db }: { db: Knex }): CreateWebhookEvent =>
|
||||
async (event) => {
|
||||
return await tables(db).webhooksEvents.insert(event).returning('id')
|
||||
}
|
||||
|
||||
export const getLastWebhookEventsFactory =
|
||||
({ db }: { db: Knex }): GetLastWebhookEvents =>
|
||||
async ({ webhookId, limit }) => {
|
||||
if (!limit) {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
return await tables(db)
|
||||
.webhooksEvents.select('*')
|
||||
.where({ webhookId })
|
||||
.orderBy('lastUpdate', 'desc')
|
||||
.limit(limit)
|
||||
}
|
||||
|
||||
export const getWebhookEventsCountFactory =
|
||||
({ db }: { db: Knex }): GetWebhookEventsCount =>
|
||||
async ({ webhookId }) => {
|
||||
const [res] = await tables(db).webhooksEvents.count().where({ webhookId })
|
||||
|
||||
return parseInt(res.count.toString())
|
||||
}
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
import { ForbiddenError } from '@/modules/shared/errors'
|
||||
import {
|
||||
CountWebhooksByStreamId,
|
||||
CreateWebhook,
|
||||
DeleteWebhook,
|
||||
GetWebhookById,
|
||||
UpdateWebhook
|
||||
} from '@/modules/webhooks/domain/operations'
|
||||
import { Webhook } from '@/modules/webhooks/domain/types'
|
||||
import { SetValuesNullable } from '@speckle/shared'
|
||||
import crs from 'crypto-random-string'
|
||||
|
||||
const MAX_STREAM_WEBHOOKS = 100
|
||||
|
||||
export const createWebhook =
|
||||
({
|
||||
createWebhookConfig,
|
||||
countWebhooksByStreamId
|
||||
}: {
|
||||
createWebhookConfig: CreateWebhook
|
||||
countWebhooksByStreamId: CountWebhooksByStreamId
|
||||
}) =>
|
||||
async ({
|
||||
streamId,
|
||||
url,
|
||||
description,
|
||||
secret,
|
||||
enabled,
|
||||
triggers
|
||||
}: Pick<Webhook, 'streamId' | 'enabled' | 'triggers'> &
|
||||
Partial<SetValuesNullable<Pick<Webhook, 'url' | 'description' | 'secret'>>>) => {
|
||||
const streamWebhookCount = await countWebhooksByStreamId({ streamId })
|
||||
if (streamWebhookCount >= MAX_STREAM_WEBHOOKS) {
|
||||
throw new Error(
|
||||
`Maximum number of webhooks for a stream reached (${MAX_STREAM_WEBHOOKS})`
|
||||
)
|
||||
}
|
||||
|
||||
return await createWebhookConfig({
|
||||
id: crs({ length: 10 }),
|
||||
streamId,
|
||||
url: url ?? undefined,
|
||||
description: description ?? undefined,
|
||||
secret: secret ?? undefined,
|
||||
enabled,
|
||||
triggers
|
||||
})
|
||||
}
|
||||
|
||||
export const updateWebhook =
|
||||
({ updateWebhookConfig }: { updateWebhookConfig: UpdateWebhook }) =>
|
||||
async (
|
||||
webhook: Pick<Webhook, 'id'> &
|
||||
Partial<SetValuesNullable<Omit<Webhook, 'id' | 'updatedAt'>>>
|
||||
) => {
|
||||
const { id, streamId, url, description, secret, enabled, triggers } = webhook
|
||||
return await updateWebhookConfig({
|
||||
webhookId: id,
|
||||
webhookInput: {
|
||||
streamId: streamId ?? undefined,
|
||||
url: url ?? undefined,
|
||||
description: description ?? undefined,
|
||||
secret: secret ?? undefined,
|
||||
enabled: enabled ?? undefined,
|
||||
triggers: triggers ?? undefined,
|
||||
updatedAt: new Date()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const deleteWebhook =
|
||||
({
|
||||
deleteWebhookConfig,
|
||||
getWebhookById
|
||||
}: {
|
||||
deleteWebhookConfig: DeleteWebhook
|
||||
getWebhookById: GetWebhookById
|
||||
}) =>
|
||||
async ({ id, streamId }: { id: string; streamId: string }) => {
|
||||
const wh = await getWebhookById({ id })
|
||||
if (streamId !== wh?.streamId)
|
||||
throw new ForbiddenError(
|
||||
'The webhook id and stream id do not match. Please check your inputs.'
|
||||
)
|
||||
|
||||
await deleteWebhookConfig({ id })
|
||||
|
||||
return id
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const knex = require('@/db/knex')
|
||||
const { getStream } = require('@/modules/core/repositories/streams')
|
||||
const crs = require('crypto-random-string')
|
||||
|
||||
const WebhooksEvents = () => knex('webhooks_events')
|
||||
const Users = () => knex('users')
|
||||
|
||||
const { getServerInfo } = require('../../core/services/generic')
|
||||
|
||||
module.exports = {
|
||||
async dispatchStreamEvent({ streamId, event, eventPayload }, { trx } = {}) {
|
||||
// Add server info
|
||||
eventPayload.server = await getServerInfo()
|
||||
eventPayload.server.canonicalUrl = process.env.CANONICAL_URL
|
||||
delete eventPayload.server.id
|
||||
|
||||
// Add stream info
|
||||
if (eventPayload.streamId) {
|
||||
eventPayload.stream = await getStream(
|
||||
{
|
||||
streamId: eventPayload.streamId,
|
||||
userId: eventPayload.userId
|
||||
},
|
||||
{ trx }
|
||||
)
|
||||
}
|
||||
|
||||
// Add user info (except email and pwd)
|
||||
if (eventPayload.userId) {
|
||||
eventPayload.user = await Users()
|
||||
.where({ id: eventPayload.userId })
|
||||
.select('*')
|
||||
.first()
|
||||
if (eventPayload.user) {
|
||||
delete eventPayload.user.passwordDigest
|
||||
delete eventPayload.user.email
|
||||
}
|
||||
}
|
||||
|
||||
// with this select, we must have the streamid available on the webhook config,
|
||||
// even when the stream is deleted, to dispatch the stream deleted webhook events
|
||||
const { rows } = await knex.raw(
|
||||
`
|
||||
SELECT * FROM webhooks_config WHERE "streamId" = ?
|
||||
`,
|
||||
[streamId]
|
||||
)
|
||||
for (const wh of rows) {
|
||||
if (!wh.enabled) continue
|
||||
if (!(event in wh.triggers)) continue
|
||||
|
||||
// Add webhook info (the key `webhook` will be replaced for each webhook configured, before serializing the payload and storing it)
|
||||
eventPayload.webhook = wh
|
||||
eventPayload.webhook.triggers = Object.keys(eventPayload.webhook.triggers)
|
||||
delete eventPayload.webhook.secret
|
||||
|
||||
const q = WebhooksEvents().insert({
|
||||
id: crs({ length: 20 }),
|
||||
webhookId: wh.id,
|
||||
payload: JSON.stringify(eventPayload)
|
||||
})
|
||||
if (trx) q.transacting(trx)
|
||||
await q
|
||||
}
|
||||
},
|
||||
|
||||
async getLastWebhookEvents({ webhookId, limit }) {
|
||||
if (!limit) {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
return await WebhooksEvents()
|
||||
.select('*')
|
||||
.where({ webhookId })
|
||||
.orderBy('lastUpdate', 'desc')
|
||||
.limit(limit)
|
||||
},
|
||||
|
||||
async getWebhookEventsCount({ webhookId }) {
|
||||
const [res] = await WebhooksEvents().count().where({ webhookId })
|
||||
|
||||
return parseInt(res.count)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
import { getServerInfo as getServerInfoFn } from '@/modules/core/services/generic'
|
||||
import { ForbiddenError } from '@/modules/shared/errors'
|
||||
import {
|
||||
CountWebhooksByStreamId,
|
||||
CreateWebhookConfig,
|
||||
CreateWebhookEvent,
|
||||
DeleteWebhookConfig,
|
||||
GetWebhookById,
|
||||
UpdateWebhookConfig
|
||||
} from '@/modules/webhooks/domain/operations'
|
||||
import { Webhook } from '@/modules/webhooks/domain/types'
|
||||
import { SetValuesNullable } from '@speckle/shared'
|
||||
import crs from 'crypto-random-string'
|
||||
import {
|
||||
StreamWithOptionalRole,
|
||||
getStream as getStreamFn
|
||||
} from '@/modules/core/repositories/streams'
|
||||
import {
|
||||
getUser as getUserFn,
|
||||
UserWithOptionalRole
|
||||
} from '@/modules/core/repositories/users'
|
||||
import { Knex } from 'knex'
|
||||
|
||||
const MAX_STREAM_WEBHOOKS = 100
|
||||
|
||||
export const createWebhookFactory =
|
||||
({
|
||||
createWebhookConfig,
|
||||
countWebhooksByStreamId
|
||||
}: {
|
||||
createWebhookConfig: CreateWebhookConfig
|
||||
countWebhooksByStreamId: CountWebhooksByStreamId
|
||||
}) =>
|
||||
async ({
|
||||
streamId,
|
||||
url,
|
||||
description,
|
||||
secret,
|
||||
enabled,
|
||||
triggers
|
||||
}: Pick<Webhook, 'streamId' | 'enabled' | 'triggers'> &
|
||||
Partial<SetValuesNullable<Pick<Webhook, 'url' | 'description' | 'secret'>>>) => {
|
||||
const streamWebhookCount = await countWebhooksByStreamId({ streamId })
|
||||
if (streamWebhookCount >= MAX_STREAM_WEBHOOKS) {
|
||||
throw new Error(
|
||||
`Maximum number of webhooks for a stream reached (${MAX_STREAM_WEBHOOKS})`
|
||||
)
|
||||
}
|
||||
|
||||
return await createWebhookConfig({
|
||||
id: crs({ length: 10 }),
|
||||
streamId,
|
||||
url: url ?? undefined,
|
||||
description: description ?? undefined,
|
||||
secret: secret ?? undefined,
|
||||
enabled,
|
||||
triggers
|
||||
})
|
||||
}
|
||||
|
||||
export const updateWebhookFactory =
|
||||
({ updateWebhookConfig }: { updateWebhookConfig: UpdateWebhookConfig }) =>
|
||||
async (
|
||||
webhook: Pick<Webhook, 'id'> &
|
||||
Partial<SetValuesNullable<Omit<Webhook, 'id' | 'updatedAt'>>>
|
||||
) => {
|
||||
const { id, streamId, url, description, secret, enabled, triggers } = webhook
|
||||
return await updateWebhookConfig({
|
||||
webhookId: id,
|
||||
webhookInput: {
|
||||
streamId: streamId ?? undefined,
|
||||
url: url ?? undefined,
|
||||
description: description ?? undefined,
|
||||
secret: secret ?? undefined,
|
||||
enabled: enabled ?? undefined,
|
||||
triggers: triggers ?? undefined,
|
||||
updatedAt: new Date()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const deleteWebhookFactory =
|
||||
({
|
||||
deleteWebhookConfig,
|
||||
getWebhookById
|
||||
}: {
|
||||
deleteWebhookConfig: DeleteWebhookConfig
|
||||
getWebhookById: GetWebhookById
|
||||
}) =>
|
||||
async ({ id, streamId }: { id: string; streamId: string }) => {
|
||||
const wh = await getWebhookById({ id })
|
||||
if (streamId !== wh?.streamId)
|
||||
throw new ForbiddenError(
|
||||
'The webhook id and stream id do not match. Please check your inputs.'
|
||||
)
|
||||
|
||||
await deleteWebhookConfig({ id })
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
export const dispatchStreamEventFactory =
|
||||
({
|
||||
db,
|
||||
getServerInfo,
|
||||
getStream,
|
||||
createWebhookEvent,
|
||||
getUser
|
||||
}: {
|
||||
db: Knex // TODO: this should not be injected here
|
||||
getServerInfo: typeof getServerInfoFn
|
||||
getStream: typeof getStreamFn
|
||||
createWebhookEvent: CreateWebhookEvent
|
||||
getUser: typeof getUserFn
|
||||
}) =>
|
||||
async (
|
||||
{
|
||||
streamId,
|
||||
event,
|
||||
eventPayload
|
||||
}: {
|
||||
streamId: string
|
||||
event: string
|
||||
eventPayload: {
|
||||
server: { id?: number; canonicalUrl?: string }
|
||||
streamId?: string
|
||||
stream?: StreamWithOptionalRole
|
||||
userId?: string
|
||||
user: Partial<UserWithOptionalRole> | null
|
||||
webhook: Webhook
|
||||
}
|
||||
},
|
||||
{ trx }: { trx?: Knex.Transaction } = {}
|
||||
) => {
|
||||
// Add server info
|
||||
eventPayload.server = await getServerInfo()
|
||||
eventPayload.server.canonicalUrl = process.env.CANONICAL_URL
|
||||
delete eventPayload.server.id
|
||||
|
||||
// Add stream info
|
||||
if (eventPayload.streamId) {
|
||||
eventPayload.stream = await getStream(
|
||||
{
|
||||
streamId: eventPayload.streamId,
|
||||
userId: eventPayload.userId
|
||||
},
|
||||
{ trx }
|
||||
)
|
||||
}
|
||||
|
||||
// Add user info (except email and pwd)
|
||||
if (eventPayload.userId) {
|
||||
eventPayload.user = await getUser(eventPayload.userId)
|
||||
if (eventPayload.user) {
|
||||
delete eventPayload.user.passwordDigest
|
||||
delete eventPayload.user.email
|
||||
}
|
||||
}
|
||||
|
||||
// with this select, we must have the streamid available on the webhook config,
|
||||
// even when the stream is deleted, to dispatch the stream deleted webhook events
|
||||
const { rows } = await db.raw(
|
||||
`
|
||||
SELECT * FROM webhooks_config WHERE "streamId" = ?
|
||||
`,
|
||||
[streamId]
|
||||
)
|
||||
for (const wh of rows) {
|
||||
if (!wh.enabled) continue
|
||||
if (!(event in wh.triggers)) continue
|
||||
|
||||
// Add webhook info (the key `webhook` will be replaced for each webhook configured, before serializing the payload and storing it)
|
||||
eventPayload.webhook = wh
|
||||
eventPayload.webhook.triggers = Object.keys(eventPayload.webhook.triggers)
|
||||
delete eventPayload.webhook.secret
|
||||
|
||||
await createWebhookEvent({
|
||||
id: crs({ length: 20 }),
|
||||
webhookId: wh.id,
|
||||
payload: JSON.stringify(eventPayload)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -9,28 +9,33 @@ const {
|
||||
} = require('@/test/hooks')
|
||||
const { noErrors } = require('@/test/helpers')
|
||||
const { createPersonalAccessToken } = require('../../core/services/tokens')
|
||||
const { getLastWebhookEvents, dispatchStreamEvent } = require('../services/webhooks')
|
||||
const { createUser } = require('../../core/services/users')
|
||||
const { createStream, grantPermissionsStream } = require('../../core/services/streams')
|
||||
const { Scopes, Roles } = require('@speckle/shared')
|
||||
const {
|
||||
createWebhookFactory,
|
||||
createWebhookConfigFactory,
|
||||
countWebhooksByStreamIdFactory,
|
||||
getWebhookByIdFactory,
|
||||
updateWebhookFactory,
|
||||
deleteWebhookFactory,
|
||||
getStreamWebhooksFactory
|
||||
updateWebhookConfigFactory,
|
||||
deleteWebhookConfigFactory,
|
||||
getStreamWebhooksFactory,
|
||||
createWebhookEventFactory,
|
||||
getLastWebhookEventsFactory
|
||||
} = require('@/modules/webhooks/repositories/webhooks')
|
||||
const { db } = require('@/db/knex')
|
||||
const {
|
||||
createWebhook,
|
||||
updateWebhook: updateWebhookService,
|
||||
deleteWebhook
|
||||
} = require('@/modules/webhooks/services/webhooks-new')
|
||||
createWebhookFactory,
|
||||
updateWebhookFactory,
|
||||
deleteWebhookFactory,
|
||||
dispatchStreamEventFactory
|
||||
} = require('@/modules/webhooks/services/webhooks')
|
||||
const { Users, Streams } = require('@/modules/core/dbSchema')
|
||||
const { getServerInfo } = require('@/modules/core/services/generic')
|
||||
const { getStream } = require('@/modules/core/repositories/streams')
|
||||
const { getUser } = require('@/modules/core/repositories/users')
|
||||
|
||||
const updateWebhook = updateWebhookService({
|
||||
updateWebhookConfig: updateWebhookFactory({ db })
|
||||
const updateWebhook = updateWebhookFactory({
|
||||
updateWebhookConfig: updateWebhookConfigFactory({ db })
|
||||
})
|
||||
const getStreamWebhooks = getStreamWebhooksFactory({ db })
|
||||
|
||||
@@ -82,8 +87,8 @@ describe('Webhooks @webhooks', () => {
|
||||
|
||||
describe('Create, Read, Update, Delete Webhooks', () => {
|
||||
it('Should create a webhook', async () => {
|
||||
webhookOne.id = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
webhookOne.id = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhookOne)
|
||||
expect(webhookOne).to.have.property('id')
|
||||
@@ -98,8 +103,8 @@ describe('Webhooks @webhooks', () => {
|
||||
})
|
||||
|
||||
it('Should update a webhook', async () => {
|
||||
const webhookId = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
const webhookId = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhookOne)
|
||||
|
||||
@@ -127,12 +132,12 @@ describe('Webhooks @webhooks', () => {
|
||||
enabled: true,
|
||||
triggers: ['commit_create', 'commit_update']
|
||||
}
|
||||
webhook.id = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
webhook.id = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
await deleteWebhook({
|
||||
deleteWebhookConfig: deleteWebhookFactory({ db }),
|
||||
await deleteWebhookFactory({
|
||||
deleteWebhookConfig: deleteWebhookConfigFactory({ db }),
|
||||
getWebhookById: getWebhookByIdFactory({ db })
|
||||
})(webhook)
|
||||
const webhookDeleted = await getWebhookByIdFactory({ db })({ id: webhook.id })
|
||||
@@ -158,8 +163,8 @@ describe('Webhooks @webhooks', () => {
|
||||
enabled: true,
|
||||
triggers: ['commit_create', 'commit_update']
|
||||
}
|
||||
await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
streamWebhooks = await getStreamWebhooks({ streamId })
|
||||
@@ -187,16 +192,22 @@ describe('Webhooks @webhooks', () => {
|
||||
enabled: true,
|
||||
triggers: ['commit_create', 'commit_update']
|
||||
}
|
||||
const webhookId = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
const webhookId = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
await dispatchStreamEvent({
|
||||
await dispatchStreamEventFactory({
|
||||
db,
|
||||
getServerInfo,
|
||||
getStream,
|
||||
createWebhookEvent: createWebhookEventFactory({ db }),
|
||||
getUser
|
||||
})({
|
||||
streamId,
|
||||
event: 'commit_create',
|
||||
eventPayload: { test: 'payload123' }
|
||||
})
|
||||
const lastEvents = await getLastWebhookEvents({ webhookId })
|
||||
const lastEvents = await getLastWebhookEventsFactory({ db })({ webhookId })
|
||||
expect(lastEvents).to.have.lengthOf(1)
|
||||
expect(JSON.parse(lastEvents[0].payload).test).to.equal('payload123')
|
||||
})
|
||||
@@ -259,7 +270,13 @@ describe('Webhooks @webhooks', () => {
|
||||
})
|
||||
|
||||
it('Should get stream webhooks and the previous events', async () => {
|
||||
await dispatchStreamEvent({
|
||||
await dispatchStreamEventFactory({
|
||||
db,
|
||||
getServerInfo,
|
||||
getStream,
|
||||
createWebhookEvent: createWebhookEventFactory({ db }),
|
||||
getUser
|
||||
})({
|
||||
streamId: streamTwo.id,
|
||||
event: 'commit_create',
|
||||
eventPayload: { test: 'payload321' }
|
||||
@@ -334,8 +351,8 @@ describe('Webhooks @webhooks', () => {
|
||||
enabled: true,
|
||||
triggers: ['commit_create', 'commit_update']
|
||||
}
|
||||
webhook.id = await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
webhook.id = await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
const res = await sendRequest(userOne.token, {
|
||||
@@ -384,15 +401,15 @@ describe('Webhooks @webhooks', () => {
|
||||
triggers: ['commit_create', 'commit_update']
|
||||
}
|
||||
for (let i = 0; i < limit; i++) {
|
||||
await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
}
|
||||
|
||||
try {
|
||||
await createWebhook({
|
||||
createWebhookConfig: createWebhookFactory({ db }),
|
||||
await createWebhookFactory({
|
||||
createWebhookConfig: createWebhookConfigFactory({ db }),
|
||||
countWebhooksByStreamId: countWebhooksByStreamIdFactory({ db })
|
||||
})(webhook)
|
||||
} catch (err) {
|
||||
|
||||
Reference in New Issue
Block a user