Files
Kristaps Fabians Geikins b801442bac feat: saved views subscriptions (#5601)
* implemented

* init tests + fixes

* tests

* WIP FE subs

* WIP new post update handlers

* more post merge fixes

* switch over to full subs for post mutation updates

* moved over more post update to subs

* drag across groups seems to work again?

* group subs

* stuff seems to work
2025-10-02 11:58:08 +03:00

563 lines
17 KiB
TypeScript

/* eslint-disable @typescript-eslint/no-explicit-any */
import type { DocumentNode, FormattedExecutionResult } from 'graphql'
import type { GraphQLContext } from '@/modules/shared/helpers/typeHelper'
import type { TypedDocumentNode } from '@graphql-typed-document-node/core'
import { buildApolloServer, buildApolloSubscriptionServer } from '@/app'
import { buildContext } from '@/modules/shared/middleware'
import { Roles } from '@/modules/core/helpers/mainConstants'
import type {
MaybeAsync,
MaybeNullOrUndefined,
Optional,
ServerScope
} from '@speckle/shared'
import { AllScopes, buildManualPromise, ensureError, timeoutAt } from '@speckle/shared'
import { expect } from 'chai'
import type { ApolloServer, GraphQLResponse } from '@apollo/server'
import { getUserFactory } from '@/modules/core/repositories/users'
import { db } from '@/db/knex'
import { get, isUndefined, pick, set } from 'lodash-es'
import { isTestEnv } from '@/modules/shared/helpers/envHelper'
import { publish, TestSubscriptions } from '@/modules/shared/utils/subscriptions'
import cryptoRandomString from 'crypto-random-string'
import * as MockSocket from 'mock-socket'
import type ws from 'ws'
import { createAuthTokenForUser } from '@/test/authHelper'
import { SubscriptionClient } from 'subscriptions-transport-ws'
import { WebSocketLink } from '@apollo/client/link/ws/ws.cjs'
import { execute } from '@apollo/client/core'
import { PingPongDocument } from '@/modules/core/graph/generated/graphql'
import { BaseError } from '@/modules/shared/errors'
import EventEmitter from 'eventemitter2'
import { expectToThrow } from '@/test/assertionHelper'
import { testLogger } from '@/observability/logging'
type TypedGraphqlResponse<R = Record<string, any>> = GraphQLResponse<R>
const getUser = getUserFactory({ db })
export const getResponseResults = <TData = Record<string, unknown>>(
res: GraphQLResponse<TData>
) => {
const body = res.body
if (body.kind === 'incremental') {
return {
data: body.initialResult.data as MaybeNullOrUndefined<TData>,
errors: body.initialResult.errors
}
} else {
return {
data: body.singleResult.data as MaybeNullOrUndefined<TData>,
errors: body.singleResult.errors
}
}
}
export type ExecuteOperationResponse<R extends Record<string, any>> = {
res: TypedGraphqlResponse<R>
} & ReturnType<typeof getResponseResults<R>>
export type ServerAndContext = {
apollo: ApolloServer<GraphQLContext>
context?: MaybeNullOrUndefined<GraphQLContext>
}
export type ExecuteOperationServer = ServerAndContext
/**
* Use this to execute GQL operations from tests against an Apollo instance and get
* a properly typed response
* @deprecated Use `testApolloServer` instead
*/
export async function executeOperation<
R extends Record<string, any> = Record<string, any>,
V extends Record<string, any> = Record<string, any>
>(
apollo: ExecuteOperationServer,
query: DocumentNode,
variables?: V,
context?: GraphQLContext
): Promise<ExecuteOperationResponse<R>> {
const server: ApolloServer<GraphQLContext> = apollo.apollo
const contextValue = context || apollo.context || (await createTestContext())
const res = (await server.executeOperation(
{
query,
variables
},
{ contextValue }
)) as TypedGraphqlResponse<R>
const results = getResponseResults(res)
// Replicate clearing dataloaders/policies after each request
contextValue.clearCache()
return {
...results,
res
}
}
/**
* Create a test context for a GraphQL request. Optionally override any of the default values.
* By default the context will be unauthenticated
*/
export const createTestContext = async (
ctx?: Partial<GraphQLContext>
): Promise<GraphQLContext> =>
await buildContext({
authContext: {
auth: false,
userId: undefined,
role: undefined,
token: undefined,
scopes: [],
stream: undefined,
err: undefined,
...(ctx || {})
}
})
export const createAuthedTestContext = async (
userId: string,
ctxOverrides?: Partial<GraphQLContext>
): Promise<GraphQLContext> =>
await buildContext({
authContext: {
auth: true,
userId,
role: Roles.Server.User,
token: 'asd',
scopes: AllScopes,
...(ctxOverrides || {})
}
})
const buildMergedContext = async (params: {
/**
* Base/initial context, if any
*/
baseCtx?: GraphQLContext
/**
* Context overrides to apply at the very end
*/
contextOverrides?: Array<Partial<GraphQLContext>>
/**
* If set, adjust context to be authed w/ all scopes and the actual user role for this user id.
*/
authUserId?: string | null
}) => {
let baseCtx: GraphQLContext = params.baseCtx || (await createTestContext())
// Init ctx from userId?
if (params?.authUserId) {
const userData = await getUser(params.authUserId, { withRole: true })
const role = userData?.role || Roles.Server.User
const userCtx = await createAuthedTestContext(params.authUserId, { role })
// Apply authed context to base
baseCtx = {
...baseCtx,
...pick(userCtx, ['auth', 'userId', 'role', 'token', 'scopes'])
}
} else if (params?.authUserId === null) {
// Apply unauthed context to base
baseCtx = {
...baseCtx,
...pick(await createTestContext(), ['auth', 'userId', 'role', 'token', 'scopes'])
}
}
// If ctx passed in also - merge them
if (params?.contextOverrides?.length) {
for (const ctx of params.contextOverrides) {
baseCtx = {
...baseCtx,
...ctx
}
}
}
// Apply dataloaders from scratch
baseCtx = await createTestContext(baseCtx)
return baseCtx
}
/**
* Utilities that make it easier to test against an Apollo Server instance
*/
export const testApolloServer = async (params?: {
/**
* Pass in a context to use. If used together with authUserId, the two contexts will be merged w/ these
* overrides taking precedence
*/
context?: Partial<GraphQLContext>
/**
* If set, will create an authed context w/ all scopes and the actual user role for this user id.
* If user doesn't exist yet, will default to the User role
*/
authUserId?: string
}) => {
const baseCtx = await buildMergedContext({
authUserId: params?.authUserId,
contextOverrides: params?.context ? [params.context] : undefined
})
const instance = await buildApolloServer()
/**
* Execute an operation against Apollo and get a properly typed response
*/
const execute = async <
R extends Record<string, any> = Record<string, any>,
V extends Record<string, any> = Record<string, any>
>(
query: TypedDocumentNode<R, V>,
variables: V,
options?: Partial<{
/**
* Override context to use. If used together with authUserId, the two contexts will be merged w/ these
* overrides taking precedence
*/
context?: Partial<GraphQLContext>
/**
* If set, will create an authed context w/ all scopes and the actual user role for this user id.
* If user doesn't exist yet, will default to the User role
* Null means - set to anonymous
*/
authUserId?: string | null
/**
* Whether to add an assertion that there were no GQL errors
*/
assertNoErrors: boolean
}>
): Promise<ExecuteOperationResponse<R>> => {
const operationCtx =
!isUndefined(options?.authUserId) || options?.context
? await buildMergedContext({
baseCtx,
authUserId: options?.authUserId,
contextOverrides: [...(options?.context ? [options.context] : [])]
})
: undefined
// Re-apply createTestContext to reset dataloaders, authpolicy etc. state
const ctx = await createTestContext(operationCtx || baseCtx)
const res = (await instance.executeOperation(
{
query,
variables
},
{ contextValue: ctx }
)) as TypedGraphqlResponse<R>
const results = getResponseResults(res)
if (options?.assertNoErrors) {
expect(results).to.not.haveGraphQLErrors()
}
return {
...results,
res
}
}
return { execute, server: instance }
}
export type TestApolloServer = Awaited<ReturnType<typeof testApolloServer>>
export type ExecuteOperationOptions = Parameters<TestApolloServer['execute']>[2]
/**
* In test env we use a ping sub as a readiness signal for other subscriptions
* (there's no better way, no "is ready" event or anything)
*/
export const startEmittingTestSubs = async () => {
if (!isTestEnv()) return undefined
const intervalMs = 100
const interval = setInterval(async () => {
await publish(TestSubscriptions.Ping, { ping: new Date().toISOString() })
}, intervalMs)
return () => clearInterval(interval)
}
export class TestApolloSubscriptionError extends BaseError {
static code = 'TEST_APOLLO_SUBSCRIPTION_ERROR'
static defaultMessage = 'Unexpected issue occurred during test subscriptions'
}
/**
* Utilities for quickly/easily testing GQL subscriptions without having to build real network servers & connections
*/
export const testApolloSubscriptionServer = async () => {
const serverId = cryptoRandomString({ length: 16, type: 'url-safe' })
const serverUrl = `ws://${serverId}.fakeWsServer:1234/graphql`
const mockWsServer = new MockSocket.Server(serverUrl)
set(mockWsServer, 'removeListener', mockWsServer.off.bind(mockWsServer)) // backwards compat w/ subscriptions-transport-ws
const mockWs = MockSocket.WebSocket as unknown as ws.WebSocket
const apolloSubServer = await buildApolloSubscriptionServer({ server: mockWsServer })
// weakRef to ensure we dont prevent garbage collection
const clients: WeakRef<SubscriptionClient>[] = []
/**
* Build subscription client. One per user is ideal.
*/
const buildClient = async (params?: {
/**
* Real user id to auth the connection with. If unset, will be unauthenticated.
* Token will be given all scopes, unless overridden
*/
authUserId?: string
/**
* Optionally provide the scopes you want the token to have
*/
scopes?: ServerScope[]
}) => {
const { authUserId, scopes } = params || {}
const token = authUserId
? await createAuthTokenForUser(authUserId, scopes)
: undefined
const wsClient = new SubscriptionClient(
serverUrl,
{
reconnect: true,
connectionParams: { headers: token ? { Authorization: `Bearer ${token}` } : {} }
},
mockWs
)
clients.push(new WeakRef(wsClient))
const clientLink = new WebSocketLink(wsClient)
/**
* Subscribe and return a fn for unsubscribing
*/
const subscribe = async <
R extends Record<string, any> = Record<string, any>,
V extends Record<string, any> = Record<string, any>
>(
query: TypedDocumentNode<R, V>,
variables: V,
handler?: (res: FormattedExecutionResult<R>) => MaybeAsync<void>
) => {
const name = getOperationName(query)
const buildLogMsg = (msg: string) => (name ? `[${name}] ${msg}` : msg)
let processingErrors: unknown[] = []
const messages: Array<FormattedExecutionResult<R>> = []
const eventBus = new EventEmitter()
const errHandler = (e: unknown) => {
processingErrors.push(e)
}
eventBus.on('uncaughtException', errHandler)
eventBus.on('error', errHandler)
const observable = execute(clientLink, {
query,
variables
})
const sub = observable.subscribe(
async (eventData) => {
const res = eventData as FormattedExecutionResult<R>
const asyncHandler = async () => handler?.(res)
// Invoke handler
try {
await asyncHandler()
} catch (e) {
// If we throw here, this will be an unhandled rejection, lets throw in waitForMsg instead
eventBus.emit('error', e)
}
// Mark msg received
try {
messages.push(res)
await eventBus.emitAsync('message', res)
} catch (e) {
eventBus.emit('error', e)
}
},
(e) => {
errHandler(e)
testLogger.error(e, 'Test subscription subscribe error handler hit')
}
)
/**
* Unsubscribe from the subscription
*/
const unsub = () => {
eventBus.removeAllListeners()
sub.unsubscribe()
}
/**
* Wait for a message to come in - it should be near instantenous, but it sometimes might occur in next ticks
* due to the async nature of subscriptions
*/
const waitForMessage = async (
options?: Partial<{
/**
* Max time to wait for the message
* Defaults to: 200
*/
timeout: number
/**
* Whether to consider messages that have already arrived before the invocation of this function.
* This is useful cause sometimes the message might arrive before we even start waiting for it.
* Defaults to: true
*/
allowPreviousMessages: boolean
/**
* Optionally wait for a specific kind of message
*/
predicate: (msg: FormattedExecutionResult<R>) => boolean
}>
): Promise<FormattedExecutionResult<R>> => {
const { timeout = 200, allowPreviousMessages = true, predicate } = options || {}
// First check for previous errors
if (processingErrors.length) {
const firstErr = processingErrors[0]
processingErrors = []
throw firstErr
}
// Then lets check previous messages
if (allowPreviousMessages) {
const found = messages.find((msg) => !predicate || predicate(msg))
if (found) return found // Found it!
}
// Now lets wait for incoming ones
const retPromise = buildManualPromise<FormattedExecutionResult<R>>()
const unlisten = () => {
eventBus.removeListener('message', onMessage)
eventBus.removeListener('error', onError)
}
const onMessage = async (msg: FormattedExecutionResult<R>) => {
if (!predicate || predicate(msg)) {
retPromise.resolve(msg)
unlisten()
}
}
const onError = (e: unknown) => {
retPromise.reject(e)
unlisten()
processingErrors = []
}
eventBus.on('message', onMessage)
eventBus.on('error', onError)
try {
return await Promise.race([retPromise.promise, timeoutAt(timeout)])
} catch (e) {
throw new TestApolloSubscriptionError(
buildLogMsg('waitForMessage() failed'),
{
cause: ensureError(e)
}
)
}
}
/**
* Wrapper over waitForMessage() that does the inverse and expects a timeout
* to happen instead (no message should arrive)
*/
const waitForTimeout = async (...params: Parameters<typeof waitForMessage>) => {
const e = await expectToThrow(() => waitForMessage(...params))
if (!e.message.includes('timeout')) {
throw e
}
}
const getMessages = (
options?: Partial<{
/**
* Optionally check for a specific kind of message
*/
predicate: (msg: FormattedExecutionResult<R>) => boolean
}>
) => {
const { predicate } = options || {}
const msgs = messages.slice()
return predicate ? msgs.filter(predicate) : msgs
}
return { unsub, waitForMessage, getMessages, waitForTimeout }
}
/**
* Invoke this after subscribe() calls to ensure that your subscriptions are ready
*/
const waitForReadiness = async () => {
return new Promise<void>(async (resolve, reject) => {
const { unsub } = await subscribe(PingPongDocument, {}, (res) => {
if (!res.data?.ping) {
return reject(new TestApolloSubscriptionError('Unexpected ping error'))
}
unsub()
resolve()
})
timeoutAt(5000, 'waitForReadiness() timed out').catch(reject)
})
}
/**
* Close down the client
*/
const quit = () => {
wsClient.close()
}
return { subscribe, waitForReadiness, quit }
}
/**
* Close down server and all clients
*/
const quit = () => {
for (const client of clients) {
client.deref()?.close()
}
mockWsServer.close()
apolloSubServer.close()
}
return {
buildClient,
quit
}
}
export type TestApolloSubscriptionServer = Awaited<
ReturnType<typeof testApolloSubscriptionServer>
>
export type TestApolloSubscriptionClient = Awaited<
ReturnType<TestApolloSubscriptionServer['buildClient']>
>
const getOperationName = (query: DocumentNode) => {
const operation = query.definitions.find((def) => def.kind === 'OperationDefinition')
// doing this w/ a get() because of some weird Ts typing issues
const name = (
operation ? get(operation, 'name.value') : undefined
) as Optional<string>
return name
}