Merge branch 'main' into fabians/multiregion-testing3

This commit is contained in:
Kristaps Fabians Geikins
2024-11-12 09:59:58 +02:00
42 changed files with 829 additions and 598 deletions
+3 -1
View File
@@ -34,7 +34,9 @@
"prom-client": "^14.0.1",
"undici": "^5.28.4",
"valid-filename": "^3.1.0",
"web-ifc": "^0.0.36"
"web-ifc": "^0.0.36",
"znv": "^0.4.0",
"zod": "^3.22.4"
},
"devDependencies": {
"cross-env": "^7.0.3",
@@ -53,7 +53,7 @@ const emit = defineEmits<{
const props = defineProps<{
versions: ProjectModelPageDialogDeleteVersionFragment[]
open: boolean
projectId?: string
projectId: string
modelId?: string
}>()
@@ -71,10 +71,10 @@ const onDelete = async () => {
loading.value = true
const success = await deleteVersions(
{
projectId: props.projectId,
versionIds: props.versions.map((v) => v.id)
},
{
projectId: props.projectId,
modelId: props.modelId
}
)
@@ -57,6 +57,7 @@ const emit = defineEmits<{
}>()
const props = defineProps<{
projectId: string
version: Nullable<ProjectModelPageDialogDeleteVersionFragment>
open: boolean
}>()
@@ -85,6 +86,7 @@ const onSubmit = handleSubmit(async ({ newMessage }) => {
loading.value = true
const success = !!(await updateVersion({
projectId: props.projectId,
versionId: props.version?.id,
message: newMessage
}))
@@ -81,13 +81,13 @@ const onMove = async (targetModelName: string, newModelCreated?: boolean) => {
loading.value = true
const success = await moveVersions(
{
projectId: props.projectId,
versionIds: props.versions.map((v) => v.id),
targetModelName
},
{
previousModelId: props.modelId,
newModelCreated,
projectId: props.projectId
newModelCreated
}
)
loading.value = false
@@ -867,7 +867,8 @@ export type DeleteUserEmailInput = {
};
export type DeleteVersionsInput = {
versionIds: Array<Scalars['String']['input']>;
projectId: Scalars['ID']['input'];
versionIds: Array<Scalars['ID']['input']>;
};
export enum DiscoverableStreamsSortType {
@@ -1200,9 +1201,10 @@ export type ModelsTreeItemCollection = {
};
export type MoveVersionsInput = {
projectId: Scalars['ID']['input'];
/** If the name references a nonexistant model, it will be created */
targetModelName: Scalars['String']['input'];
versionIds: Array<Scalars['String']['input']>;
versionIds: Array<Scalars['ID']['input']>;
};
export type Mutation = {
@@ -3493,7 +3495,8 @@ export type UpdateServerRegionInput = {
/** Only non-null values will be updated */
export type UpdateVersionInput = {
message?: InputMaybe<Scalars['String']['input']>;
versionId: Scalars['String']['input'];
projectId: Scalars['ID']['input'];
versionId: Scalars['ID']['input'];
};
/**
@@ -350,7 +350,6 @@ export function useDeleteVersions() {
* Various options for better cache updates, set if possible
*/
options?: Partial<{
projectId: string
modelId: string
}>
) => {
@@ -372,26 +371,21 @@ export function useDeleteVersions() {
}
// Update totalCounts in project
if (options?.projectId) {
modifyObjectFields<ProjectVersionsArgs, Project['versions']>(
cache,
getCacheId('Project', options.projectId),
(_fieldName, _variables, data) => {
return {
...data,
...(!isUndefined(data.totalCount)
? {
totalCount: Math.max(
data.totalCount - input.versionIds.length,
0
)
}
: {})
}
},
{ fieldNameWhitelist: ['versions'] }
)
}
modifyObjectFields<ProjectVersionsArgs, Project['versions']>(
cache,
getCacheId('Project', input.projectId),
(_fieldName, _variables, data) => {
return {
...data,
...(!isUndefined(data.totalCount)
? {
totalCount: Math.max(data.totalCount - input.versionIds.length, 0)
}
: {})
}
},
{ fieldNameWhitelist: ['versions'] }
)
// Update totalCounts in model
if (options?.modelId) {
@@ -458,7 +452,6 @@ export function useMoveVersions() {
options?: Partial<{
previousModelId: string
newModelCreated: boolean
projectId: string
}>
) => {
if (!input.versionIds.length || !input.targetModelName.trim()) return
@@ -551,8 +544,8 @@ export function useMoveVersions() {
{ fieldNameWhitelist: ['versions'] }
)
if (options?.newModelCreated && options?.projectId) {
evictProjectModels(options.projectId)
if (options?.newModelCreated) {
evictProjectModels(input.projectId)
}
}
})
+2 -1
View File
@@ -59,7 +59,8 @@
"tarn": "^3.0.2",
"yargs": "^17.3.0",
"zlib": "^1.0.5",
"zod": "^3.23.8"
"znv": "^0.4.0",
"zod": "^3.22.4"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.645.0",
@@ -160,7 +160,8 @@ type ModelMutations {
}
input MoveVersionsInput {
versionIds: [String!]!
projectId: ID!
versionIds: [ID!]!
"""
If the name references a nonexistant model, it will be created
"""
@@ -168,14 +169,16 @@ input MoveVersionsInput {
}
input DeleteVersionsInput {
versionIds: [String!]!
projectId: ID!
versionIds: [ID!]!
}
"""
Only non-null values will be updated
"""
input UpdateVersionInput {
versionId: String!
projectId: ID!
versionId: ID!
message: String
}
+24 -48
View File
@@ -1,5 +1,4 @@
/* eslint-disable no-restricted-imports */
/* eslint-disable camelcase */
/* istanbul ignore file */
import { packageRoot } from './bootstrap'
import fs from 'fs'
@@ -10,8 +9,14 @@ import {
postgresMaxConnections,
isDevOrTestEnv
} from '@/modules/shared/helpers/envHelper'
import { dbLogger as logger } from './logging/logging'
import { dbLogger as logger } from '@/logging/logging'
import { Knex } from 'knex'
import {
createKnexConfig,
configureKnexClient,
KnexConfigArgs,
RegionServerConfig
} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js'
function walk(dir: string) {
let results: string[] = []
@@ -69,67 +74,38 @@ if (env.POSTGRES_USER && env.POSTGRES_PASSWORD) {
// this is why the new datetime columns are created like this
// table.specificType('createdAt', 'TIMESTAMPTZ(3)').defaultTo(knex.fn.now())
export const createKnexConfig = ({
connectionString,
caCertificate
}: {
connectionString?: string
caCertificate?: string | undefined
}): Knex.Config => {
return {
client: 'pg',
migrations: {
extension: 'ts',
loadExtensions: isTestEnv() ? ['.js', '.ts'] : ['.js'],
directory: migrationDirs
},
log: {
warn(message: unknown) {
logger.warn(message)
},
error(message: unknown) {
logger.error(message)
},
deprecate(message: unknown) {
logger.info(message)
},
debug(message: unknown) {
logger.debug(message)
}
},
connection: {
connectionString,
ssl: caCertificate ? { ca: caCertificate, rejectUnauthorized: true } : undefined,
application_name: 'speckle_server'
},
// we wish to avoid leaking sql queries in the logs: https://knexjs.org/guide/#compilesqlonerror
compileSqlOnError: isDevOrTestEnv(),
asyncStackTraces: isDevOrTestEnv(),
pool: {
min: 0,
max: postgresMaxConnections(),
acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts
createTimeoutMillis: 5000
}
}
const configArgs: KnexConfigArgs = {
migrationDirs,
isTestEnv: isTestEnv(),
isDevOrTestEnv: isDevOrTestEnv(),
applicationName: 'speckle_server',
logger,
maxConnections: postgresMaxConnections()
}
const config: Record<string, Knex.Config> = {
test: {
...createKnexConfig({
connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_test'
connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_test',
...configArgs
})
},
development: {
...createKnexConfig({
connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_dev'
connectionString: connectionUri || 'postgres://127.0.0.1/speckle2_dev',
...configArgs
})
},
production: {
...createKnexConfig({
connectionString: connectionUri
connectionString: connectionUri,
...configArgs
})
}
}
export const configureClient = (config: RegionServerConfig) => {
return configureKnexClient(config, configArgs)
}
export default config
@@ -27,6 +27,7 @@ import { Knex } from 'knex'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const tables = {
streamActivity: <T extends object = StreamActivityRecord>(db: Knex) =>
@@ -253,11 +254,15 @@ export const saveActivityFactory =
}
}
const projectDb = await getProjectDbClient({ projectId: streamId })
// yes, we're manually instantiating this thing here, but i do not want to go through all the places,
// where we're calling saveActivity!
// the whole activity module will need to be refactored to use the eventBus
await dispatchStreamEventFactory({
getStreamWebhooks: getStreamWebhooksFactory({ db }),
getStreamWebhooks: getStreamWebhooksFactory({ db: projectDb }),
getServerInfo: getServerInfoFactory({ db }),
getStream: getStreamFactory({ db }),
createWebhookEvent: createWebhookEventFactory({ db }),
getStream: getStreamFactory({ db: projectDb }),
createWebhookEvent: createWebhookEventFactory({ db: projectDb }),
getUser: getUserFactory({ db })
})({
streamId,
@@ -5,7 +5,6 @@ import {
} from '@/modules/shared/utils/subscriptions'
import {
CommitCreateInput,
CommitReceivedInput,
CommitUpdateInput,
ProjectVersionsUpdatedMessageType,
UpdateVersionInput
@@ -40,7 +39,7 @@ export const addCommitCreatedActivityFactory =
modelId: string
commit: CommitRecord
}) => {
const { commitId, input, streamId, userId, branchName, commit } = params
const { commitId, input, streamId, userId, branchName, commit, modelId } = params
await Promise.all([
saveActivity({
streamId,
@@ -53,7 +52,7 @@ export const addCommitCreatedActivityFactory =
commit: {
...input,
projectId: streamId,
modelId: params.modelId,
modelId,
versionId: commit.id
}
},
@@ -67,7 +66,7 @@ export const addCommitCreatedActivityFactory =
projectId: streamId,
projectVersionsUpdated: {
id: commit.id,
version: commit,
version: { ...commit, streamId },
type: ProjectVersionsUpdatedMessageType.Created,
modelId: null
}
@@ -123,7 +122,7 @@ export const addCommitUpdatedActivityFactory =
projectId: streamId,
projectVersionsUpdated: {
id: commitId,
version: newCommit,
version: { ...newCommit, streamId },
type: ProjectVersionsUpdatedMessageType.Updated,
modelId: null
}
@@ -162,7 +161,7 @@ export const addCommitMovedActivityFactory =
projectId: streamId,
projectVersionsUpdated: {
id: commitId,
version: commit,
version: { ...commit, streamId },
type: ProjectVersionsUpdatedMessageType.Updated,
modelId: null
}
@@ -211,22 +210,3 @@ export const addCommitDeletedActivityFactory =
})
])
}
export const addCommitReceivedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }) =>
async (params: { input: CommitReceivedInput; userId: string }) => {
const { input, userId } = params
await saveActivity({
streamId: input.streamId,
resourceType: ResourceTypes.Commit,
resourceId: input.commitId,
actionType: ActionTypes.Commit.Receive,
userId,
info: {
sourceApplication: input.sourceApplication,
message: input.message
},
message: `Commit ${input.commitId} was received by user ${userId}`
})
}
@@ -855,10 +855,9 @@ export = (FF_AUTOMATE_MODULE_ENABLED
}
},
User: {
automateInfo: () => ({
hasAutomateGithubApp: false,
availableGithubOrgs: []
})
automateInfo: () => {
throw new AutomateApiDisabledError()
}
},
ServerInfo: {
automate: () => ({
@@ -1,12 +1,12 @@
import { Branch } from '@/modules/core/domain/branches/types'
import {
CommitWithBranchId,
CommitWithStreamBranchMetadata,
Commit,
CommitBranch,
CommitWithStreamId,
LegacyUserCommit,
LegacyStreamCommit
LegacyStreamCommit,
CommitWithStreamBranchId
} from '@/modules/core/domain/commits/types'
import {
CommitsDeleteInput,
@@ -54,7 +54,7 @@ export type GetSpecificBranchCommits = (
branchId: string
commitId: string
}[]
) => Promise<CommitWithBranchId[]>
) => Promise<CommitWithStreamBranchId[]>
export type StoreCommit = (
params: Omit<NullableKeysToOptional<Commit>, 'id' | 'createdAt'>
@@ -74,7 +74,7 @@ export type CreateCommitByBranchId = (
options?: Partial<{
notify: boolean
}>
) => Promise<Commit>
) => Promise<CommitWithStreamBranchId>
export type CreateCommitByBranchName = (
params: NullableKeysToOptional<{
@@ -109,7 +109,7 @@ export type InsertStreamCommits = (
export type UpdateCommitAndNotify = (
params: CommitUpdateInput | UpdateVersionInput,
userId: string
) => Promise<Commit>
) => Promise<CommitWithStreamBranchId>
export type GetCommitBranches = (commitIds: string[]) => Promise<CommitBranch[]>
@@ -166,7 +166,7 @@ export type GetUserAuthoredCommitCounts = (params: {
export type GetCommitsAndTheirBranchIds = (
commitIds: string[]
) => Promise<CommitWithBranchId[]>
) => Promise<CommitWithStreamBranchId[]>
export type GetBatchedStreamCommits = (
streamId: string,
@@ -203,7 +203,7 @@ export type PaginatedBranchCommitsParams = PaginatedBranchCommitsBaseParams & {
export type GetPaginatedBranchCommitsItems = (
params: PaginatedBranchCommitsParams
) => Promise<{
commits: Commit[]
commits: CommitWithStreamBranchId[]
cursor: string | null
}>
@@ -217,7 +217,7 @@ export type GetPaginatedBranchCommits = (
}
) => Promise<{
totalCount: number
items: Commit[]
items: CommitWithStreamBranchId[]
cursor: string | null
}>
@@ -14,9 +14,12 @@ export type CommitWithBranchId = Commit & {
export type CommitWithStreamId = Commit & { streamId: string }
export type BranchLatestCommit = CommitWithBranchId
export type CommitWithStreamBranchMetadata = Commit & {
export type CommitWithStreamBranchId = Commit & {
streamId: string
branchId: string
}
export type CommitWithStreamBranchMetadata = CommitWithStreamBranchId & {
branchName: string
}
@@ -31,6 +34,7 @@ export type LegacyUserCommit = {
parents: CommitRecord['parents']
createdAt: CommitRecord['createdAt']
branchName: BranchRecord['name']
branchId: BranchRecord['id']
streamId: StreamCommitRecord['streamId']
streamName: StreamRecord['name']
authorName: UserRecord['name']
@@ -47,6 +51,8 @@ export type LegacyStreamCommit = {
parents: CommitRecord['parents']
createdAt: CommitRecord['createdAt']
branchName: BranchRecord['name']
branchId: BranchRecord['id']
streamId: StreamCommitRecord['streamId']
authorName: UserRecord['name']
authorId: UserRecord['id']
authorAvatar: UserRecord['avatar']
@@ -89,6 +89,10 @@ import {
getUsersFactory,
UserWithOptionalRole
} from '@/modules/core/repositories/users'
import {
CommitWithStreamBranchId,
CommitWithStreamBranchMetadata
} from '@/modules/core/domain/commits/types'
declare module '@/modules/core/loaders' {
interface ModularizedDataLoaders extends ReturnType<typeof dataLoadersDefinition> {}
@@ -170,14 +174,17 @@ const dataLoadersDefinition = defineRequestDataloaders(
* thus its own query.
*/
getStreamCommit: (() => {
type CommitDataLoader = DataLoader<string, Nullable<CommitRecord>>
type CommitDataLoader = DataLoader<
string,
Nullable<CommitWithStreamBranchMetadata>
>
const streamCommitLoaders = new Map<string, CommitDataLoader>()
return {
clearAll: () => streamCommitLoaders.clear(),
forStream(streamId: string): CommitDataLoader {
let loader = streamCommitLoaders.get(streamId)
if (!loader) {
loader = createLoader<string, Nullable<CommitRecord>>(
loader = createLoader<string, Nullable<CommitWithStreamBranchMetadata>>(
async (commitIds) => {
const results = keyBy(
await getCommits(commitIds.slice(), { streamId }),
@@ -365,7 +372,7 @@ const dataLoadersDefinition = defineRequestDataloaders(
}),
getBranchCommit: createLoader<
{ branchId: string; commitId: string },
Nullable<CommitRecord>,
Nullable<CommitWithStreamBranchId>,
string
>(
async (idPairs) => {
@@ -889,7 +889,8 @@ export type DeleteUserEmailInput = {
};
export type DeleteVersionsInput = {
versionIds: Array<Scalars['String']['input']>;
projectId: Scalars['ID']['input'];
versionIds: Array<Scalars['ID']['input']>;
};
export enum DiscoverableStreamsSortType {
@@ -1222,9 +1223,10 @@ export type ModelsTreeItemCollection = {
};
export type MoveVersionsInput = {
projectId: Scalars['ID']['input'];
/** If the name references a nonexistant model, it will be created */
targetModelName: Scalars['String']['input'];
versionIds: Array<Scalars['String']['input']>;
versionIds: Array<Scalars['ID']['input']>;
};
export type Mutation = {
@@ -3515,7 +3517,8 @@ export type UpdateServerRegionInput = {
/** Only non-null values will be updated */
export type UpdateVersionInput = {
message?: InputMaybe<Scalars['String']['input']>;
versionId: Scalars['String']['input'];
projectId: Scalars['ID']['input'];
versionId: Scalars['ID']['input'];
};
/**
@@ -11,7 +11,7 @@ import {
legacyGetPaginatedStreamCommitsFactory
} from '@/modules/core/services/commit/retrieval'
import {
markCommitReceivedAndNotify,
markCommitReceivedAndNotifyFactory,
deleteCommitAndNotifyFactory,
createCommitByBranchIdFactory,
createCommitByBranchNameFactory,
@@ -352,7 +352,10 @@ export = {
context.resourceAccessRules
)
await markCommitReceivedAndNotify({
await markCommitReceivedAndNotifyFactory({
getCommit: getCommitFactory({ db }),
saveActivity: saveActivityFactory({ db })
})({
input: args.input,
userId: context.userId!
})
@@ -61,99 +61,68 @@ import {
} from '@/modules/core/repositories/streams'
import { ModelsEmitter } from '@/modules/core/events/modelsEmitter'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db })
const getStream = getStreamFactory({ db })
const getStreamObjects = getStreamObjectsFactory({ db })
const getViewerResourceGroups = getViewerResourceGroupsFactory({
getStreamObjects,
getBranchLatestCommits: getBranchLatestCommitsFactory({ db }),
getStreamBranchesByName: getStreamBranchesByNameFactory({ db }),
getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }),
getAllBranchCommits: getAllBranchCommitsFactory({ db })
})
const getPaginatedProjectModels = getPaginatedProjectModelsFactory({
getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({ db }),
getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory({
db
})
})
const getModelTreeItems = getModelTreeItemsFactory({ db })
const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({
getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db }),
getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory({
db
}),
getModelTreeItems,
getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({ db })
})
const createBranchAndNotify = createBranchAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
createBranch: createBranchFactory({ db }),
addBranchCreatedActivity: addBranchCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const updateBranchAndNotify = updateBranchAndNotifyFactory({
getBranchById: getBranchByIdFactory({ db }),
updateBranch: updateBranchFactory({ db }),
addBranchUpdatedActivity: addBranchUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const deleteBranchAndNotify = deleteBranchAndNotifyFactory({
getStream,
getBranchById: getBranchByIdFactory({ db }),
modelsEventsEmitter: ModelsEmitter.emit,
markBranchStreamUpdated,
addBranchDeletedActivity: addBranchDeletedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
}),
deleteBranchById: deleteBranchByIdFactory({ db })
})
const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({
getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db }),
getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({ db }),
getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db })
})
const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({
legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory({
db
}),
getStreamCommitCount: getStreamCommitCountFactory({ db })
})
import {
getProjectDbClient,
getRegisteredRegionClients
} from '@/modules/multiregion/dbSelector'
export = {
User: {
async versions(parent, args, ctx) {
const authoredOnly = args.authoredOnly
const regionClients = await getRegisteredRegionClients()
const allLoaders = [
ctx.loaders,
...Object.values(regionClients).map((db) => ctx.loaders.forRegion({ db }))
]
let counts: number[]
if (authoredOnly) {
counts = await Promise.all(
allLoaders.map((loader) =>
loader.users.getAuthoredCommitCount.load(parent.id)
)
)
} else {
counts = await Promise.all(
allLoaders.map((loader) => loader.users.getStreamCommitCount.load(parent.id))
)
}
return {
totalCount: authoredOnly
? await ctx.loaders.users.getAuthoredCommitCount.load(parent.id)
: await ctx.loaders.users.getStreamCommitCount.load(parent.id)
totalCount: counts.reduce((acc, curr) => acc + curr, 0)
}
}
},
Project: {
async models(parent, args, ctx) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
// If limit=0 & no filter, short-cut full execution and use data loader
if (args.limit === 0 && !args.filter) {
return {
totalCount: await ctx.loaders.streams.getBranchCount.load(parent.id),
totalCount: await ctx.loaders
.forRegion({ db: projectDB })
.streams.getBranchCount.load(parent.id),
items: [],
cursor: null
}
}
const getPaginatedProjectModels = getPaginatedProjectModelsFactory({
getPaginatedProjectModelsItems: getPaginatedProjectModelsItemsFactory({
db: projectDB
}),
getPaginatedProjectModelsTotalCount: getPaginatedProjectModelsTotalCountFactory(
{
db: projectDB
}
)
})
return await getPaginatedProjectModels(parent.id, args)
},
async model(_parent, args, ctx) {
const model = await ctx.loaders.branches.getById.load(args.id)
async model(parent, args, ctx) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
const model = await ctx.loaders
.forRegion({ db: projectDB })
.branches.getById.load(args.id)
if (!model) {
throw new BranchNotFoundError('Model not found')
}
@@ -161,8 +130,10 @@ export = {
return model
},
async modelByName(parent, args, ctx) {
const model = await ctx.loaders.streams.getStreamBranchByName
.forStream(parent.id)
const projectDB = await getProjectDbClient({ projectId: parent.id })
const model = await ctx.loaders
.forRegion({ db: projectDB })
.streams.getStreamBranchByName.forStream(parent.id)
.load(args.name)
if (!model) {
throw new BranchNotFoundError('Model not found')
@@ -171,9 +142,25 @@ export = {
return model
},
async modelsTree(parent, args) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB })
const getProjectTopLevelModelsTree = getProjectTopLevelModelsTreeFactory({
getModelTreeItemsFiltered: getModelTreeItemsFilteredFactory({ db: projectDB }),
getModelTreeItemsFilteredTotalCount: getModelTreeItemsFilteredTotalCountFactory(
{
db: projectDB
}
),
getModelTreeItems,
getModelTreeItemsTotalCount: getModelTreeItemsTotalCountFactory({
db: projectDB
})
})
return await getProjectTopLevelModelsTree(parent.id, args)
},
async modelChildrenTree(parent, { fullName }) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB })
return await getModelTreeItems(
parent.id,
{},
@@ -183,6 +170,15 @@ export = {
)
},
async viewerResources(parent, { resourceIdString, loadedVersionsOnly }) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
const getStreamObjects = getStreamObjectsFactory({ db: projectDB })
const getViewerResourceGroups = getViewerResourceGroupsFactory({
getStreamObjects,
getBranchLatestCommits: getBranchLatestCommitsFactory({ db: projectDB }),
getStreamBranchesByName: getStreamBranchesByNameFactory({ db: projectDB }),
getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }),
getAllBranchCommits: getAllBranchCommitsFactory({ db: projectDB })
})
return await getViewerResourceGroups({
projectId: parent.id,
resourceIdString,
@@ -190,6 +186,7 @@ export = {
})
},
async versions(parent, args, ctx) {
const projectDB = await getProjectDbClient({ projectId: parent.id })
// If limit=0, short-cut full execution and use data loader
if (args.limit === 0) {
return {
@@ -201,6 +198,14 @@ export = {
}
}
const getPaginatedStreamCommits = legacyGetPaginatedStreamCommitsFactory({
legacyGetPaginatedStreamCommitsPage: legacyGetPaginatedStreamCommitsPageFactory(
{
db: projectDB
}
),
getStreamCommitCount: getStreamCommitCountFactory({ db: projectDB })
})
return await getPaginatedStreamCommits(parent.id, args)
}
},
@@ -209,11 +214,16 @@ export = {
return await ctx.loaders.users.getUser.load(parent.authorId)
},
async previewUrl(parent, _args, ctx) {
const latestCommit = await ctx.loaders.branches.getLatestCommit.load(parent.id)
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
const latestCommit = await ctx.loaders
.forRegion({ db: projectDB })
.branches.getLatestCommit.load(parent.id)
const path = `/preview/${parent.streamId}/commits/${latestCommit?.id || ''}`
return latestCommit ? new URL(path, getServerOrigin()).toString() : null
},
async childrenTree(parent) {
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB })
return await getModelTreeItems(
parent.streamId,
{},
@@ -226,15 +236,25 @@ export = {
return last(parent.name.split('/'))
},
async versions(parent, args, ctx) {
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
// If limit=0 & no filter, short-cut full execution and use data loader
if (!args.filter && args.limit === 0) {
return {
totalCount: await ctx.loaders.branches.getCommitCount.load(parent.id),
totalCount: await ctx.loaders
.forRegion({ db: projectDB })
.branches.getCommitCount.load(parent.id),
items: [],
cursor: null
}
}
const getPaginatedBranchCommits = getPaginatedBranchCommitsFactory({
getSpecificBranchCommits: getSpecificBranchCommitsFactory({ db: projectDB }),
getPaginatedBranchCommitsItems: getPaginatedBranchCommitsItemsFactory({
db: projectDB
}),
getBranchCommitsTotalCount: getBranchCommitsTotalCountFactory({ db: projectDB })
})
return await getPaginatedBranchCommits({
branchId: parent.id,
cursor: args.cursor,
@@ -243,10 +263,13 @@ export = {
})
},
async version(parent, args, ctx) {
const version = await ctx.loaders.branches.getBranchCommit.load({
branchId: parent.id,
commitId: args.id
})
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
const version = await ctx.loaders
.forRegion({ db: projectDB })
.branches.getBranchCommit.load({
branchId: parent.id,
commitId: args.id
})
if (!version) {
throw new CommitNotFoundError('Version not found')
}
@@ -256,11 +279,15 @@ export = {
},
ModelsTreeItem: {
async model(parent, _args, ctx) {
return await ctx.loaders.streams.getStreamBranchByName
.forStream(parent.projectId)
const projectDB = await getProjectDbClient({ projectId: parent.projectId })
return await ctx.loaders
.forRegion({ db: projectDB })
.streams.getStreamBranchByName.forStream(parent.projectId)
.load(parent.fullName)
},
async children(parent) {
const projectDB = await getProjectDbClient({ projectId: parent.projectId })
const getModelTreeItems = getModelTreeItemsFactory({ db: projectDB })
return await getModelTreeItems(
parent.projectId,
{},
@@ -281,6 +308,15 @@ export = {
Roles.Stream.Contributor,
ctx.resourceAccessRules
)
const projectDB = await getProjectDbClient({ projectId: args.input.projectId })
const createBranchAndNotify = createBranchAndNotifyFactory({
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDB }),
createBranch: createBranchFactory({ db: projectDB }),
addBranchCreatedActivity: addBranchCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
return await createBranchAndNotify(args.input, ctx.userId!)
},
async update(_parent, args, ctx) {
@@ -290,6 +326,15 @@ export = {
Roles.Stream.Contributor,
ctx.resourceAccessRules
)
const projectDB = await getProjectDbClient({ projectId: args.input.projectId })
const updateBranchAndNotify = updateBranchAndNotifyFactory({
getBranchById: getBranchByIdFactory({ db: projectDB }),
updateBranch: updateBranchFactory({ db: projectDB }),
addBranchUpdatedActivity: addBranchUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
return await updateBranchAndNotify(args.input, ctx.userId!)
},
async delete(_parent, args, ctx) {
@@ -299,6 +344,20 @@ export = {
Roles.Stream.Contributor,
ctx.resourceAccessRules
)
const projectDB = await getProjectDbClient({ projectId: args.input.projectId })
const markBranchStreamUpdated = markBranchStreamUpdatedFactory({ db: projectDB })
const getStream = getStreamFactory({ db })
const deleteBranchAndNotify = deleteBranchAndNotifyFactory({
getStream,
getBranchById: getBranchByIdFactory({ db: projectDB }),
modelsEventsEmitter: ModelsEmitter.emit,
markBranchStreamUpdated,
addBranchDeletedActivity: addBranchDeletedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
}),
deleteBranchById: deleteBranchByIdFactory({ db: projectDB })
})
return await deleteBranchAndNotify(args.input, ctx.userId!)
}
},
@@ -10,19 +10,19 @@ import {
} from '@/modules/core/repositories/objects'
import { db } from '@/db/knex'
import { createObjectsFactory } from '@/modules/core/services/objects/management'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
const getObject = getObjectFactory({ db })
const createObjects = createObjectsFactory({
storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }),
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db })
})
const getObjectChildren = getObjectChildrenFactory({ db })
const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db })
type GetObjectChildrenQueryParams = Parameters<typeof getObjectChildrenQuery>[0]
type GetObjectChildrenQueryParams = Parameters<
ReturnType<typeof getObjectChildrenQueryFactory>
>[0]
const getStreamObject: NonNullable<Resolvers['Stream']>['object'] =
async function object(parent, args) {
return (await getObject(args.id, parent.id)) || null
return (
(await getObjectFactory({
db: await getProjectDbClient({ projectId: parent.id })
})(args.id, parent.id)) || null
)
}
export = {
@@ -34,8 +34,10 @@ export = {
},
Object: {
async children(parent, args) {
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
// The simple query branch
if (!args.query && !args.orderBy) {
const getObjectChildren = getObjectChildrenFactory({ db })
const result = await getObjectChildren({
streamId: parent.streamId,
objectId: parent.id,
@@ -60,6 +62,7 @@ export = {
}
}
const getObjectChildrenQuery = getObjectChildrenQueryFactory({ db: projectDB })
// The complex query branch
const result = await getObjectChildrenQuery({
streamId: parent.streamId,
@@ -98,6 +101,13 @@ export = {
context.resourceAccessRules
)
const projectDB = await getProjectDbClient({
projectId: args.objectInput.streamId
})
const createObjects = createObjectsFactory({
storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db: projectDB }),
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db: projectDB })
})
const ids = await createObjects({
streamId: args.objectInput.streamId,
objects: args.objectInput.objects.filter(isNonNullable)
@@ -354,7 +354,12 @@ export = {
}))
},
async sourceApps(parent, _args, ctx) {
return ctx.loaders.streams.getSourceApps.load(parent.id) || []
const projectDB = await getProjectDbClient({ projectId: parent.id })
return (
ctx.loaders
.forRegion({ db: projectDB })
.streams.getSourceApps.load(parent.id) || []
)
},
async visibility(parent) {
@@ -14,7 +14,7 @@ import {
import { CommitNotFoundError, CommitUpdateError } from '@/modules/core/errors/commit'
import {
createCommitByBranchIdFactory,
markCommitReceivedAndNotify,
markCommitReceivedAndNotifyFactory,
updateCommitAndNotifyFactory
} from '@/modules/core/services/commit/management'
import {
@@ -56,69 +56,15 @@ import {
} from '@/modules/activitystream/services/commitActivity'
import { getObjectFactory } from '@/modules/core/repositories/objects'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
const markCommitStreamUpdated = markCommitStreamUpdatedFactory({ db })
const getCommitStream = getCommitStreamFactory({ db })
const getStream = getStreamFactory({ db })
const getStreams = getStreamsFactory({ db })
const getObject = getObjectFactory({ db })
const createCommitByBranchId = createCommitByBranchIdFactory({
createCommit: createCommitFactory({ db }),
getObject,
getBranchById: getBranchByIdFactory({ db }),
insertStreamCommits: insertStreamCommitsFactory({ db }),
insertBranchCommits: insertBranchCommitsFactory({ db }),
markCommitStreamUpdated,
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db }),
versionsEventEmitter: VersionsEmitter.emit,
addCommitCreatedActivity: addCommitCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const updateCommitAndNotify = updateCommitAndNotifyFactory({
getCommit: getCommitFactory({ db }),
getStream,
getCommitStream,
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
getCommitBranch: getCommitBranchFactory({ db }),
switchCommitBranch: switchCommitBranchFactory({ db }),
updateCommit: updateCommitFactory({ db }),
addCommitUpdatedActivity: addCommitUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
}),
markCommitStreamUpdated,
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db })
})
const batchMoveCommits = batchMoveCommitsFactory({
getCommits: getCommitsFactory({ db }),
getStreams,
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
createBranch: createBranchFactory({ db }),
moveCommitsToBranch: moveCommitsToBranchFactory({ db }),
addCommitMovedActivity: addCommitMovedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const batchDeleteCommits = batchDeleteCommitsFactory({
getCommits: getCommitsFactory({ db }),
getStreams,
deleteCommits: deleteCommitsFactory({ db }),
addCommitDeletedActivity: addCommitDeletedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
export = {
Project: {
async version(parent, args, ctx) {
const version = await ctx.loaders.streams.getStreamCommit
.forStream(parent.id)
const projectDB = await getProjectDbClient({ projectId: parent.id })
const version = await ctx.loaders
.forRegion({ db: projectDB })
.streams.getStreamCommit.forStream(parent.id)
.load(args.id)
if (!version) {
throw new CommitNotFoundError('Version not found')
@@ -131,13 +77,23 @@ export = {
async authorUser(parent, _args, ctx) {
const { author } = parent
if (!author) return null
return (await ctx.loaders.users.getUser.load(author)) || null
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
return (
(await ctx.loaders.forRegion({ db: projectDB }).users.getUser.load(author)) ||
null
)
},
async model(parent, _args, ctx) {
return await ctx.loaders.commits.getCommitBranch.load(parent.id)
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
return await ctx.loaders
.forRegion({ db: projectDB })
.commits.getCommitBranch.load(parent.id)
},
async previewUrl(parent, _args, ctx) {
const stream = await ctx.loaders.commits.getCommitStream.load(parent.id)
const projectDB = await getProjectDbClient({ projectId: parent.streamId })
const stream = await ctx.loaders
.forRegion({ db: projectDB })
.commits.getCommitStream.load(parent.id)
const path = `/preview/${stream!.id}/commits/${parent.id}`
return new URL(path, getServerOrigin()).toString()
}
@@ -147,16 +103,47 @@ export = {
},
VersionMutations: {
async moveToModel(_parent, args, ctx) {
// TODO: how to get streamId here?
const projectId = args.input.projectId
const projectDb = await getProjectDbClient({ projectId })
const batchMoveCommits = batchMoveCommitsFactory({
getCommits: getCommitsFactory({ db: projectDb }),
getStreams: getStreamsFactory({ db: projectDb }),
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
createBranch: createBranchFactory({ db: projectDb }),
moveCommitsToBranch: moveCommitsToBranchFactory({ db: projectDb }),
addCommitMovedActivity: addCommitMovedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
return await batchMoveCommits(args.input, ctx.userId!)
},
async delete(_parent, args, ctx) {
// TODO: how to get streamId here?
const projectId = args.input.projectId
const projectDb = await getProjectDbClient({ projectId })
const batchDeleteCommits = batchDeleteCommitsFactory({
getCommits: getCommitsFactory({ db: projectDb }),
getStreams: getStreamsFactory({ db: projectDb }),
deleteCommits: deleteCommitsFactory({ db: projectDb }),
addCommitDeletedActivity: addCommitDeletedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
await batchDeleteCommits(args.input, ctx.userId!)
return true
},
async update(_parent, args, ctx) {
const stream = await ctx.loaders.commits.getCommitStream.load(
args.input.versionId
)
// TODO: how to get streamId here?
const projectId = args.input.projectId
const projectDb = await getProjectDbClient({ projectId })
const stream = await ctx.loaders
.forRegion({ db: projectDb })
.commits.getCommitStream.load(args.input.versionId)
if (!stream) {
throw new CommitUpdateError('Commit stream not found')
}
@@ -167,6 +154,22 @@ export = {
Roles.Stream.Contributor,
ctx.resourceAccessRules
)
const updateCommitAndNotify = updateCommitAndNotifyFactory({
getCommit: getCommitFactory({ db: projectDb }),
getStream: getStreamFactory({ db: projectDb }),
getCommitStream: getCommitStreamFactory({ db: projectDb }),
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
getCommitBranch: getCommitBranchFactory({ db: projectDb }),
switchCommitBranch: switchCommitBranchFactory({ db: projectDb }),
updateCommit: updateCommitFactory({ db: projectDb }),
addCommitUpdatedActivity: addCommitUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
}),
markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db: projectDb }),
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db: projectDb })
})
return await updateCommitAndNotify(args.input, ctx.userId!)
},
async create(_parent, args, ctx) {
@@ -182,6 +185,23 @@ export = {
throw new RateLimitError(rateLimitResult)
}
const projectDb = await getProjectDbClient({ projectId: args.input.projectId })
const createCommitByBranchId = createCommitByBranchIdFactory({
createCommit: createCommitFactory({ db: projectDb }),
getObject: getObjectFactory({ db: projectDb }),
getBranchById: getBranchByIdFactory({ db: projectDb }),
insertStreamCommits: insertStreamCommitsFactory({ db: projectDb }),
insertBranchCommits: insertBranchCommitsFactory({ db: projectDb }),
markCommitStreamUpdated: markCommitStreamUpdatedFactory({ db: projectDb }),
markCommitBranchUpdated: markCommitBranchUpdatedFactory({ db: projectDb }),
versionsEventEmitter: VersionsEmitter.emit,
addCommitCreatedActivity: addCommitCreatedActivityFactory({
saveActivity: saveActivityFactory({ db }),
publish
})
})
const commit = await createCommitByBranchId({
authorId: ctx.userId!,
streamId: args.input.projectId,
@@ -202,8 +222,12 @@ export = {
Roles.Stream.Reviewer,
ctx.resourceAccessRules
)
const projectDb = await getProjectDbClient({ projectId: args.input.projectId })
await markCommitReceivedAndNotify({
await markCommitReceivedAndNotifyFactory({
getCommit: getCommitFactory({ db: projectDb }),
saveActivity: saveActivityFactory({ db })
})({
input: args.input,
userId: ctx.userId!
})
@@ -1,4 +1,5 @@
import {
CommitWithStreamBranchId,
LegacyStreamCommit,
LegacyUserCommit
} from '@/modules/core/domain/commits/types'
@@ -43,7 +44,7 @@ export type ProjectGraphQLReturn = StreamGraphQLReturn
export type ModelGraphQLReturn = BranchRecord
export type VersionGraphQLReturn = CommitRecord
export type VersionGraphQLReturn = CommitWithStreamBranchId
export type LimitedUserGraphQLReturn = Omit<
LimitedUser,
@@ -623,7 +623,7 @@ export const getModelTreeItemsFactory =
options
)
const finalQuery = knex.from(query.as('sq1'))
const finalQuery = deps.db.from(query.as('sq1'))
finalQuery.limit(limit)
if (args.cursor) {
@@ -23,6 +23,7 @@ import {
import { Knex } from 'knex'
import { MaybeNullOrUndefined, Optional } from '@speckle/shared'
import {
CommitWithStreamBranchId,
CommitWithStreamBranchMetadata,
LegacyStreamCommit,
LegacyUserCommit
@@ -251,7 +252,7 @@ export const getCommitsAndTheirBranchIdsFactory =
return await tables
.commits(deps.db)
.select<Array<CommitRecord & { branchId: string }>>([
.select<Array<CommitWithStreamBranchId>>([
...Commits.cols,
BranchCommits.col.branchId
])
@@ -269,16 +270,19 @@ export const getSpecificBranchCommitsFactory =
const q = tables
.commits(deps.db)
.select<Array<CommitRecord & { branchId: string }>>([
.select<Array<CommitWithStreamBranchId>>([
...Commits.cols,
BranchCommits.col.branchId
knex.raw(`(array_agg(??))[1] as "branchId"`, [BranchCommits.col.branchId]),
knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamCommits.col.streamId])
])
.innerJoin(BranchCommits.name, BranchCommits.col.commitId, Commits.col.id)
.innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id)
.whereIn(Commits.col.id, commitIds)
.whereIn(BranchCommits.col.branchId, branchIds)
.groupBy(Commits.col.id)
const queryResults = await q
const results: Array<CommitRecord & { branchId: string }> = []
const results: Array<CommitWithStreamBranchId> = []
for (const pair of pairs) {
const commit = queryResults.find(
@@ -294,13 +298,18 @@ export const getSpecificBranchCommitsFactory =
const getPaginatedBranchCommitsBaseQueryFactory =
(deps: { db: Knex }) =>
<T = CommitRecord[]>(params: PaginatedBranchCommitsBaseParams) => {
<T = CommitWithStreamBranchId[]>(params: PaginatedBranchCommitsBaseParams) => {
const { branchId, filter } = params
const q = tables
.commits(deps.db)
.select<T>(Commits.cols)
.select<T>([
...Commits.cols,
knex.raw(`(array_agg(??))[1] as "branchId"`, [BranchCommits.col.branchId]),
knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamCommits.col.streamId])
])
.innerJoin(BranchCommits.name, BranchCommits.col.commitId, Commits.col.id)
.innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id)
.innerJoin(Branches.name, Branches.col.id, BranchCommits.col.branchId)
.where(Branches.col.id, branchId)
.groupBy(Commits.col.id)
@@ -338,7 +347,7 @@ export const getBranchCommitsTotalCountFactory =
(deps: { db: Knex }): GetBranchCommitsTotalCount =>
async (params: PaginatedBranchCommitsBaseParams) => {
const baseQ = getPaginatedBranchCommitsBaseQueryFactory(deps)(params)
const q = knex.count<{ count: string }[]>().from(baseQ.as('sq1'))
const q = deps.db.count<{ count: string }[]>().from(baseQ.as('sq1'))
const [res] = await q
return parseInt(res?.count || '0')
@@ -1,11 +1,10 @@
import { db } from '@/db/knex'
import {
AddCommitCreatedActivity,
AddCommitDeletedActivity,
AddCommitUpdatedActivity
AddCommitUpdatedActivity,
SaveActivity
} from '@/modules/activitystream/domain/operations'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import { addCommitReceivedActivityFactory } from '@/modules/activitystream/services/commitActivity'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import {
GetBranchById,
GetStreamBranchByName,
@@ -47,43 +46,50 @@ import {
MarkReceivedVersionInput,
UpdateVersionInput
} from '@/modules/core/graph/generated/graphql'
import { CommitRecord } from '@/modules/core/helpers/types'
import { getCommitFactory } from '@/modules/core/repositories/commits'
import { BranchRecord, CommitRecord } from '@/modules/core/helpers/types'
import { ensureError, Roles } from '@speckle/shared'
import { has } from 'lodash'
export async function markCommitReceivedAndNotify(params: {
input: MarkReceivedVersionInput | CommitReceivedInput
userId: string
}) {
const { input, userId } = params
export const markCommitReceivedAndNotifyFactory =
({ getCommit, saveActivity }: { getCommit: GetCommit; saveActivity: SaveActivity }) =>
async (params: {
input: MarkReceivedVersionInput | CommitReceivedInput
userId: string
}) => {
const { input, userId } = params
const oldInput: CommitReceivedInput =
'projectId' in input
? {
...input,
streamId: input.projectId,
commitId: input.versionId
}
: input
const oldInput: CommitReceivedInput =
'projectId' in input
? {
...input,
streamId: input.projectId,
commitId: input.versionId
}
: input
const commit = await getCommitFactory({ db })(oldInput.commitId, {
streamId: oldInput.streamId
})
if (!commit) {
throw new CommitReceiveError(
`Failed to find commit with id ${oldInput.commitId} in stream ${oldInput.streamId}.`,
{ info: params }
)
}
await addCommitReceivedActivityFactory({ saveActivity: saveActivityFactory({ db }) })(
{
input: oldInput,
userId
const commit = await getCommit(oldInput.commitId, {
streamId: oldInput.streamId
})
if (!commit) {
throw new CommitReceiveError(
`Failed to find commit with id ${oldInput.commitId} in stream ${oldInput.streamId}.`,
{ info: params }
)
}
)
}
await saveActivity({
streamId: oldInput.streamId,
resourceType: ResourceTypes.Commit,
resourceId: oldInput.commitId,
actionType: ActionTypes.Commit.Receive,
userId,
info: {
sourceApplication: input.sourceApplication,
message: input.message
},
message: `Commit ${oldInput.commitId} was received by user ${userId}`
})
}
export const createCommitByBranchIdFactory =
(deps: {
@@ -172,7 +178,7 @@ export const createCommitByBranchIdFactory =
: [])
])
return commit
return { ...commit, streamId, branchId }
}
export const createCommitByBranchNameFactory =
@@ -288,21 +294,20 @@ export const updateCommitAndNotifyFactory =
)
}
let branch: BranchRecord | undefined = await deps.getCommitBranch(commitId)
if (newBranchName) {
try {
const [newBranch, oldBranch] = await Promise.all([
deps.getStreamBranchByName(streamId, newBranchName),
deps.getCommitBranch(commitId)
])
const newBranch = await deps.getStreamBranchByName(streamId, newBranchName)
if (!newBranch || !oldBranch) {
if (!newBranch || !branch) {
throw new Error("Couldn't resolve branch")
}
if (!commit) {
throw new Error("Couldn't find commit")
}
await deps.switchCommitBranch(commitId, newBranch.id, oldBranch.id)
await deps.switchCommitBranch(commitId, newBranch.id, branch.id)
branch = newBranch
} catch (e) {
throw new CommitUpdateError('Failed to update commit branch', {
cause: ensureError(e),
@@ -326,13 +331,14 @@ export const updateCommitAndNotifyFactory =
newCommit
})
await Promise.all([
deps.markCommitStreamUpdated(commit.id),
deps.markCommitBranchUpdated(commit.id)
const [updatedBranch] = await Promise.all([
deps.markCommitBranchUpdated(commit.id),
deps.markCommitStreamUpdated(commit.id)
])
branch = updatedBranch
}
return newCommit
return { ...newCommit, streamId: stream.id, branchId: branch!.id }
}
export const deleteCommitAndNotifyFactory =
@@ -870,7 +870,8 @@ export type DeleteUserEmailInput = {
};
export type DeleteVersionsInput = {
versionIds: Array<Scalars['String']['input']>;
projectId: Scalars['ID']['input'];
versionIds: Array<Scalars['ID']['input']>;
};
export enum DiscoverableStreamsSortType {
@@ -1203,9 +1204,10 @@ export type ModelsTreeItemCollection = {
};
export type MoveVersionsInput = {
projectId: Scalars['ID']['input'];
/** If the name references a nonexistant model, it will be created */
targetModelName: Scalars['String']['input'];
versionIds: Array<Scalars['String']['input']>;
versionIds: Array<Scalars['ID']['input']>;
};
export type Mutation = {
@@ -3496,7 +3498,8 @@ export type UpdateServerRegionInput = {
/** Only non-null values will be updated */
export type UpdateVersionInput = {
message?: InputMaybe<Scalars['String']['input']>;
versionId: Scalars['String']['input'];
projectId: Scalars['ID']['input'];
versionId: Scalars['ID']['input'];
};
/**
@@ -13,16 +13,15 @@ import {
GetRegionDb
} from '@/modules/multiregion/services/projectRegion'
import { getGenericRedis } from '@/modules/shared/redis/redis'
import knex, { Knex } from 'knex'
import { Knex } from 'knex'
import { getRegionFactory, getRegionsFactory } from '@/modules/multiregion/repositories'
import { MisconfiguredEnvironmentError } from '@/modules/shared/errors'
import { createKnexConfig } from '@/knexfile'
import { configureClient } from '@/knexfile'
import { InitializeRegion } from '@/modules/multiregion/domain/operations'
import {
getAvailableRegionConfig,
getMainRegionConfig
} from '@/modules/multiregion/regionConfig'
import { RegionServerConfig } from '@/modules/multiregion/domain/types'
import { MaybeNullOrUndefined } from '@speckle/shared'
import { isTestEnv } from '@/modules/shared/helpers/envHelper'
@@ -47,7 +46,7 @@ export const getRegionDb: GetRegionDb = async ({ regionKey }) => {
throw new Error(`RegionKey ${regionKey} not available in config`)
const newRegionConfig = regionConfigs[regionKey]
const regionDb = configureKnexClient(newRegionConfig).public
const regionDb = configureClient(newRegionConfig).public
regionClients[regionKey] = regionDb
}
@@ -109,7 +108,7 @@ export const initializeRegisteredRegionClients = async (): Promise<RegionClients
throw new MisconfiguredEnvironmentError(
`Missing region config for ${region.key} region`
)
return [region.key, configureKnexClient(regionConfigs[region.key]).public]
return [region.key, configureClient(regionConfigs[region.key]).public]
})
)
@@ -125,24 +124,6 @@ export const initializeRegisteredRegionClients = async (): Promise<RegionClients
return ret
}
const configureKnexClient = (
config: RegionServerConfig
): { public: Knex; private?: Knex } => {
const knexConfig = createKnexConfig({
connectionString: config.postgres.connectionUri,
caCertificate: config.postgres.publicTlsCertificate
})
const privateConfig = config.postgres.privateConnectionUri
? knex(
createKnexConfig({
connectionString: config.postgres.privateConnectionUri,
caCertificate: config.postgres.publicTlsCertificate
})
)
: undefined
return { public: knex(knexConfig), private: privateConfig }
}
export const getRegisteredRegionClients = async (): Promise<RegionClients> => {
if (!registeredRegionClients)
registeredRegionClients = await initializeRegisteredRegionClients()
@@ -161,11 +142,11 @@ export const initializeRegion: InitializeRegion = async ({ regionKey }) => {
throw new Error(`RegionKey ${regionKey} not available in config`)
const newRegionConfig = regionConfigs[regionKey]
const regionDb = configureKnexClient(newRegionConfig)
const regionDb = configureClient(newRegionConfig)
await regionDb.public.migrate.latest()
const mainDbConfig = await getMainRegionConfig()
const mainDb = configureKnexClient(mainDbConfig)
const mainDb = configureClient(mainDbConfig)
const sslmode = newRegionConfig.postgres.publicTlsCertificate ? 'require' : 'disable'
@@ -1,6 +1,6 @@
import {
MultiRegionConfig,
ProjectRegion,
DataRegionsConfig,
RegionKey,
ServerRegion
} from '@/modules/multiregion/domain/types'
@@ -18,7 +18,7 @@ export type UpdateRegion = (params: {
region: Partial<ServerRegion>
}) => Promise<ServerRegion>
export type GetAvailableRegionConfig = () => Promise<MultiRegionConfig>
export type GetAvailableRegionConfig = () => Promise<DataRegionsConfig>
export type GetAvailableRegionKeys = () => Promise<string[]>
export type GetFreeRegionKeys = () => Promise<string[]>
@@ -1,16 +1,11 @@
import { z } from 'zod'
import {
multiRegionConfigSchema,
regionServerConfigSchema
} from '@/modules/multiregion/helpers/validation'
import { RegionRecord } from '@/modules/multiregion/helpers/types'
import { Nullable } from '@speckle/shared'
import {
DataRegionsConfig,
RegionServerConfig
} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js'
export type AllRegionsConfig = z.infer<typeof multiRegionConfigSchema>
export type MainRegionConfig = AllRegionsConfig['main']
export type MultiRegionConfig = AllRegionsConfig['regions']
export type RegionServerConfig = z.infer<typeof regionServerConfigSchema>
export { RegionServerConfig, DataRegionsConfig }
export type ServerRegion = RegionRecord
export type RegionKey = Nullable<string>
@@ -1,34 +0,0 @@
import { z } from 'zod'
export const regionServerConfigSchema = z.object({
postgres: z.object({
connectionUri: z
.string()
.describe(
'Full Postgres connection URI (e.g. "postgres://user:password@host:port/dbname")'
),
privateConnectionUri: z
.string()
.describe(
'Full Postgres connection URI in VPN or Docker networks (e.g. "postgres://user:password@host:port/dbname")'
)
.optional(),
publicTlsCertificate: z
.string()
.describe('Public TLS ("CA") certificate for the Postgres server')
.optional()
})
//TODO - add the rest of the config when blob storage is implemented
// blobStorage: z
// .object({
// endpoint: z.string().url(),
// accessKey: z.string(),
// secretKey: z.string(),
// bucket: z.string()
// })
})
export const multiRegionConfigSchema = z.object({
main: regionServerConfigSchema,
regions: z.record(z.string(), regionServerConfigSchema)
})
@@ -1,69 +1,44 @@
import { GetAvailableRegionConfig } from '@/modules/multiregion/domain/operations'
import { AllRegionsConfig } from '@/modules/multiregion/domain/types'
import { packageRoot } from '@/bootstrap'
import path from 'node:path'
import fs from 'node:fs/promises'
import {
getMultiRegionConfigPath,
isDevOrTestEnv
} from '@/modules/shared/helpers/envHelper'
import { type Optional } from '@speckle/shared'
import { multiRegionConfigSchema } from '@/modules/multiregion/helpers/validation'
import { MisconfiguredEnvironmentError } from '@/modules/shared/errors'
import { get } from 'lodash'
import { isMultiRegionEnabled } from '@/modules/multiregion/helpers'
import {
MainRegionConfig,
MultiRegionConfig,
loadMultiRegionsConfig
} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js'
let multiRegionConfig: Optional<AllRegionsConfig> = undefined
let multiRegionConfig: Optional<MultiRegionConfig> = undefined
const getAllRegionsConfig = async (): Promise<AllRegionsConfig> => {
const getMultiRegionConfig = async (): Promise<MultiRegionConfig> => {
if (isDevOrTestEnv() && !isMultiRegionEnabled()) {
// this should throw somehow
return { main: { postgres: { connectionUri: '' } }, regions: {} }
}
if (multiRegionConfig) return multiRegionConfig
const relativePath = getMultiRegionConfigPath()
if (!multiRegionConfig) {
const relativePath = getMultiRegionConfigPath()
const fullPath = path.resolve(packageRoot, relativePath)
const configPath = path.resolve(packageRoot, relativePath)
let file: string
try {
file = await fs.readFile(fullPath, 'utf-8')
} catch (e) {
if (get(e, 'code') === 'ENOENT') {
throw new MisconfiguredEnvironmentError(
`Multi-region config file not found at path: ${fullPath}`
)
}
throw e
multiRegionConfig = await loadMultiRegionsConfig({
path: configPath
})
}
let parsedJson: string
try {
parsedJson = JSON.parse(file) // This will throw if the file is not valid JSON
} catch (e) {
throw new MisconfiguredEnvironmentError(
`Multi-region config file at path '${fullPath}' is not valid JSON`
)
}
const multiRegionConfigFileResult = multiRegionConfigSchema.safeParse(parsedJson) // This will throw if the config is invalid
if (!multiRegionConfigFileResult.success)
throw new MisconfiguredEnvironmentError(
`Multi-region config file at path '${fullPath}' does not fit the schema`,
{ cause: multiRegionConfigFileResult.error, info: { parsedJson } }
)
multiRegionConfig = multiRegionConfigFileResult.data
return multiRegionConfig
}
export const getMainRegionConfig = async (): Promise<AllRegionsConfig['main']> => {
return (await getAllRegionsConfig()).main
export const getMainRegionConfig = async (): Promise<MainRegionConfig> => {
return (await getMultiRegionConfig()).main
}
export const getAvailableRegionConfig: GetAvailableRegionConfig = async () => {
return (await getAllRegionsConfig()).regions
return (await getMultiRegionConfig()).regions
}
@@ -1,4 +1,4 @@
import { MultiRegionConfig } from '@/modules/multiregion/domain/types'
import { DataRegionsConfig } from '@/modules/multiregion/domain/types'
import { Regions } from '@/modules/multiregion/repositories'
import { BasicTestUser, createTestUser } from '@/test/authHelper'
import {
@@ -27,7 +27,7 @@ describe('Multi Region Server Settings', () => {
const fakeRegionKey1 = 'us-west-1'
const fakeRegionKey2 = 'eu-east-2'
const fakeRegionConfig: MultiRegionConfig = {
const fakeRegionConfig: DataRegionsConfig = {
[fakeRegionKey1]: {
postgres: {
connectionUri: 'postgres://user:password@uswest1:port/dbname'
@@ -183,7 +183,7 @@ type SubscriptionTypeMap = {
payload: {
projectVersionsUpdated: Merge<
ProjectVersionsUpdatedMessage,
{ version: Nullable<VersionGraphQLReturn> }
{ version: Nullable<Omit<VersionGraphQLReturn, 'branchId'>> }
>
projectId: string
}
@@ -871,7 +871,8 @@ export type DeleteUserEmailInput = {
};
export type DeleteVersionsInput = {
versionIds: Array<Scalars['String']['input']>;
projectId: Scalars['ID']['input'];
versionIds: Array<Scalars['ID']['input']>;
};
export enum DiscoverableStreamsSortType {
@@ -1204,9 +1205,10 @@ export type ModelsTreeItemCollection = {
};
export type MoveVersionsInput = {
projectId: Scalars['ID']['input'];
/** If the name references a nonexistant model, it will be created */
targetModelName: Scalars['String']['input'];
versionIds: Array<Scalars['String']['input']>;
versionIds: Array<Scalars['ID']['input']>;
};
export type Mutation = {
@@ -3497,7 +3499,8 @@ export type UpdateServerRegionInput = {
/** Only non-null values will be updated */
export type UpdateVersionInput = {
message?: InputMaybe<Scalars['String']['input']>;
versionId: Scalars['String']['input'];
projectId: Scalars['ID']['input'];
versionId: Scalars['ID']['input'];
};
/**
+2
View File
@@ -38,6 +38,7 @@
},
"peerDependencies": {
"@tiptap/core": "^2.0.0-beta.176",
"knex": "*",
"mixpanel": "^0.17.0",
"pino": "^8.7.0",
"pino-http": "^8.0.0",
@@ -55,6 +56,7 @@
"@typescript-eslint/parser": "^7.12.0",
"eslint": "^9.4.0",
"eslint-config-prettier": "^9.1.0",
"knex": "^2.4.1",
"mixpanel": "^0.17.0",
"pino": "^8.7.0",
"pino-http": "^8.0.0",
@@ -0,0 +1,154 @@
import { z } from 'zod'
import fs from 'node:fs/promises'
import { Knex, knex } from 'knex'
import { Logger } from 'pino'
export const regionConfigSchema = z.object({
postgres: z.object({
connectionUri: z
.string()
.describe(
'Full Postgres connection URI (e.g. "postgres://user:password@host:port/dbname")'
),
privateConnectionUri: z
.string()
.describe(
'Full Postgres connection URI in VPN or Docker networks (e.g. "postgres://user:password@host:port/dbname")'
)
.optional(),
publicTlsCertificate: z
.string()
.describe('Public TLS ("CA") certificate for the Postgres server')
.optional()
})
//TODO - add the rest of the config when blob storage is implemented
// blobStorage: z
// .object({
// endpoint: z.string().url(),
// accessKey: z.string(),
// secretKey: z.string(),
// bucket: z.string()
// })
})
export const multiRegionConfigSchema = z.object({
main: regionConfigSchema,
regions: z.record(z.string(), regionConfigSchema)
})
export type MultiRegionConfig = z.infer<typeof multiRegionConfigSchema>
export type MainRegionConfig = MultiRegionConfig['main']
export type DataRegionsConfig = MultiRegionConfig['regions']
export type RegionServerConfig = z.infer<typeof regionConfigSchema>
export const loadMultiRegionsConfig = async ({
path
}: {
path: string
}): Promise<MultiRegionConfig> => {
let file: string
try {
file = await fs.readFile(path, 'utf-8')
} catch (e) {
if (e instanceof Error && 'code' in e && e.code === 'ENOENT') {
throw new Error(`Multi-region config file not found at path: ${path}`)
}
throw e
}
let parsedJson: Record<string, unknown>
try {
parsedJson = JSON.parse(file) as Record<string, unknown> // This will throw if the file is not valid JSON
} catch (e) {
throw new Error(`Multi-region config file at path '${path}' is not valid JSON`)
}
const multiRegionConfigFileResult = multiRegionConfigSchema.safeParse(parsedJson) // This will throw if the config is invalid
if (!multiRegionConfigFileResult.success)
throw new Error(
`Multi-region config file at path '${path}' does not fit the schema: ${multiRegionConfigFileResult.error}`
)
return multiRegionConfigFileResult.data
}
export type KnexConfigArgs = {
migrationDirs: string[]
isTestEnv: boolean
isDevOrTestEnv: boolean
logger: Logger
maxConnections: number
applicationName: string
}
export const createKnexConfig = ({
connectionString,
migrationDirs,
isTestEnv,
isDevOrTestEnv,
logger,
maxConnections,
caCertificate
}: {
connectionString?: string | undefined
caCertificate?: string | undefined
} & KnexConfigArgs): Knex.Config => {
return {
client: 'pg',
migrations: {
extension: 'ts',
loadExtensions: isTestEnv ? ['.js', '.ts'] : ['.js'],
directory: migrationDirs
},
log: {
warn(message: unknown) {
logger.warn(message)
},
error(message: unknown) {
logger.error(message)
},
deprecate(message: unknown) {
logger.info(message)
},
debug(message: unknown) {
logger.debug(message)
}
},
connection: {
connectionString,
ssl: caCertificate ? { ca: caCertificate, rejectUnauthorized: true } : undefined,
// eslint-disable-next-line camelcase
application_name: 'speckle_server'
},
// we wish to avoid leaking sql queries in the logs: https://knexjs.org/guide/#compilesqlonerror
compileSqlOnError: isDevOrTestEnv,
asyncStackTraces: isDevOrTestEnv,
pool: {
min: 0,
max: maxConnections,
acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts
createTimeoutMillis: 5000
}
}
}
export const configureKnexClient = (
config: RegionServerConfig,
configArgs: KnexConfigArgs
): { public: Knex; private?: Knex } => {
const knexConfig = createKnexConfig({
connectionString: config.postgres.connectionUri,
caCertificate: config.postgres.publicTlsCertificate,
...configArgs
})
const privateConfig = config.postgres.privateConnectionUri
? knex(
createKnexConfig({
connectionString: config.postgres.privateConnectionUri,
caCertificate: config.postgres.publicTlsCertificate,
...configArgs
})
)
: undefined
return { public: knex(knexConfig), private: privateConfig }
}
+16
View File
@@ -0,0 +1,16 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Webhook service",
"type": "node",
"request": "launch",
"runtimeExecutable": "yarn",
"runtimeArgs": ["dev"],
"skipFiles": ["<node_internals>/**"],
"env": {
"FF_WORKSPACES_MULTI_REGION_ENABLED": "false"
}
}
]
}
+3 -1
View File
@@ -30,7 +30,9 @@
"pino-pretty": "^9.1.1",
"private-ip": "^2.3.3",
"prom-client": "^14.0.1",
"verror": "^1.10.1"
"verror": "^1.10.1",
"znv": "^0.4.0",
"zod": "^3.22.4"
},
"devDependencies": {
"cross-env": "^7.0.3",
+48 -15
View File
@@ -1,18 +1,51 @@
/* eslint-disable camelcase */
'use strict'
const Environment = require('@speckle/shared/dist/commonjs/environment/index.js')
const {
loadMultiRegionsConfig,
configureKnexClient
} = require('@speckle/shared/dist/commonjs/environment/multiRegionConfig.js')
const { logger } = require('./observability/logging')
module.exports = require('knex')({
client: 'pg',
connection: {
application_name: 'speckle_webhook_service',
connectionString:
process.env.PG_CONNECTION_STRING || 'postgres://speckle:speckle@127.0.0.1/speckle'
},
pool: {
min: 0,
max: parseInt(process.env.POSTGRES_MAX_CONNECTIONS_WEBHOOK_SERVICE) || 1,
acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts
createTimeoutMillis: 5000
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
const isDevEnv = process.env.NODE_ENV !== 'production'
let dbClients
const getDbClients = async () => {
if (dbClients) return dbClients
const maxConnections =
parseInt(process.env.POSTGRES_MAX_CONNECTIONS_WEBHOOK_SERVICE) || 1
const configArgs = {
migrationDirs: [],
isTestEnv: isDevEnv,
isDevOrTestEnv: isDevEnv,
logger,
maxConnections,
applicationName: 'speckle_webhook_service'
}
// migrations are in managed in the server package
})
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
const mainClient = configureKnexClient(
{
postgres: {
connectionUri:
process.env.PG_CONNECTION_STRING ||
'postgres://speckle:speckle@127.0.0.1/speckle'
}
},
configArgs
)
dbClients = { main: mainClient }
} else {
const configPath = process.env.MULTI_REGION_CONFIG_PATH || 'multiregion.json'
const config = await loadMultiRegionsConfig({ path: configPath })
const clients = [['main', configureKnexClient(config.main, configArgs)]]
Object.entries(config.regions).map(([key, config]) => {
clients.push([key, configureKnexClient(config, configArgs)])
})
dbClients = Object.fromEntries(clients)
}
return dbClients
}
module.exports = getDbClients
+104 -108
View File
@@ -1,7 +1,7 @@
'use strict'
const crypto = require('crypto')
const knex = require('./knex')
const getDbClients = require('./knex')
const fs = require('fs')
const metrics = require('./observability/prometheusMetrics')
const { logger } = require('./observability/logging')
@@ -12,10 +12,8 @@ const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query'
const { makeNetworkRequest } = require('./webhookCaller')
const WebhookError = require('./errors')
const startTaskFactory =
({ db }) =>
async () => {
const { rows } = await db.raw(`
const startTask = async (db) => {
const { rows } = await db.raw(`
UPDATE webhooks_events
SET
"status" = 1,
@@ -29,16 +27,14 @@ const startTaskFactory =
WHERE webhooks_events."id" = task."id"
RETURNING webhooks_events."id"
`)
return rows[0]
}
return rows[0]
}
const doTaskFactory =
({ db }) =>
async (task) => {
let boundLogger = logger.child({ taskId: task.id })
try {
const { rows } = await db.raw(
`
const doTask = async (db, task) => {
let boundLogger = logger.child({ taskId: task.id })
try {
const { rows } = await db.raw(
`
SELECT
ev.payload as evt,
cnf.id as wh_id, cnf.url as wh_url, cnf.secret as wh_secret, cnf.enabled as wh_enabled
@@ -47,49 +43,49 @@ const doTaskFactory =
WHERE ev.id = ?
LIMIT 1
`,
[task.id]
[task.id]
)
const info = rows[0]
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
boundLogger = boundLogger.child({ webhookId: info.wh_id })
const fullPayload = JSON.parse(info.evt)
boundLogger = boundLogger.child({
streamId: fullPayload.streamId,
eventName: fullPayload.event.event_name
})
const postData = { payload: fullPayload }
const signature = crypto
.createHmac('sha256', info.wh_secret || '')
.update(JSON.stringify(postData))
.digest('hex')
const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature }
boundLogger.info('Calling webhook.')
const result = await makeNetworkRequest({
url: info.wh_url,
data: postData,
headersData: postHeaders,
logger: boundLogger
})
boundLogger.info({ result }, `Received response from webhook.`)
if (!result.success) {
throw new WebhookError(
result.error,
'Calling webhook was unsuccessful.',
result.responseCode,
result.responseBody
)
const info = rows[0]
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
boundLogger = boundLogger.child({ webhookId: info.wh_id })
}
const fullPayload = JSON.parse(info.evt)
boundLogger = boundLogger.child({
streamId: fullPayload.streamId,
eventName: fullPayload.event.event_name
})
const postData = { payload: fullPayload }
const signature = crypto
.createHmac('sha256', info.wh_secret || '')
.update(JSON.stringify(postData))
.digest('hex')
const postHeaders = { 'X-WEBHOOK-SIGNATURE': signature }
boundLogger.info('Calling webhook.')
const result = await makeNetworkRequest({
url: info.wh_url,
data: postData,
headersData: postHeaders,
logger: boundLogger
})
boundLogger.info({ result }, `Received response from webhook.`)
if (!result.success) {
throw new WebhookError(
result.error,
'Calling webhook was unsuccessful.',
result.responseCode,
result.responseBody
)
}
await db.raw(
`
await db.raw(
`
UPDATE webhooks_events
SET
"status" = 2,
@@ -97,18 +93,18 @@ const doTaskFactory =
"statusInfo" = 'Webhook called'
WHERE "id" = ?
`,
[task.id]
)
} catch (err) {
switch (err.constructor) {
case WebhookError:
boundLogger.warn({ err }, 'Failed to trigger webhook event.')
break
default:
boundLogger.error(err, 'Failed to trigger webhook event.')
}
await db.raw(
`
[task.id]
)
} catch (err) {
switch (err.constructor) {
case WebhookError:
boundLogger.warn({ err }, 'Failed to trigger webhook event.')
break
default:
boundLogger.error(err, 'Failed to trigger webhook event.')
}
await db.raw(
`
UPDATE webhooks_events
SET
"status" = 3,
@@ -116,43 +112,46 @@ const doTaskFactory =
"statusInfo" = ?
WHERE "id" = ?
`,
[err.toString(), task.id]
[err.toString(), task.id]
)
metrics.metricOperationErrors.labels('webhook').inc()
}
}
const doStuff = async (dbClients) => {
while (!shouldExit) {
const tasks = (
await Promise.all(
dbClients.map(async (db) => {
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
const task = await startTask(db)
if (!task) return
return [db, task]
})
)
metrics.metricOperationErrors.labels('webhook').inc()
}
}
const tickFactory =
({ doTask, startTask, tick }) =>
async () => {
if (shouldExit) {
process.exit(0)
}
try {
const task = await startTask()
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
if (!task) {
setTimeout(tick, 1000)
return
}
const metricDurationEnd = metrics.metricDuration.startTimer()
await doTask(task)
metricDurationEnd({ op: 'webhook' })
// Check for another task very soon
setTimeout(tick, 10)
} catch (err) {
metrics.metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
setTimeout(tick, 5000)
).filter((t) => t)
if (!tasks.length) {
await new Promise((r) => setTimeout(r, 1000))
continue
}
await Promise.all(
tasks.map(async ([db, task]) => {
try {
const metricDurationEnd = metrics.metricDuration.startTimer()
await doTask(db, task)
metricDurationEnd({ op: 'webhook' })
} catch (err) {
metrics.metricOperationErrors.labels('main_loop').inc()
logger.error(err, 'Error executing task')
}
})
)
}
process.exit(0)
}
async function main() {
logger.info('Starting Webhook Service...')
@@ -161,14 +160,11 @@ async function main() {
shouldExit = true
logger.info('Shutting down...')
})
metrics.initPrometheusMetrics()
await metrics.initPrometheusMetrics()
const tick = tickFactory({
doTask: doTaskFactory({ db: knex }),
startTask: startTaskFactory({ db: knex }),
tick: (...args) => tick(...args)
})
tick()
const dbClients = Object.values(await getDbClients()).map((client) => client.public)
await doStuff(dbClients)
}
main()
@@ -3,7 +3,7 @@
const http = require('http')
const prometheusClient = require('prom-client')
const knex = require('../knex')
const getDbClients = require('../knex')
let metricFree = null
let metricUsed = null
@@ -24,7 +24,8 @@ prometheusClient.collectDefaultMetrics()
let prometheusInitialized = false
function initKnexPrometheusMetrics() {
async function initKnexPrometheusMetrics() {
const knex = (await getDbClients()).main.public
metricFree = new prometheusClient.Gauge({
name: 'speckle_server_knex_free',
help: 'Number of free DB connections',
@@ -114,11 +115,11 @@ function initKnexPrometheusMetrics() {
}
module.exports = {
initPrometheusMetrics() {
async initPrometheusMetrics() {
if (prometheusInitialized) return
prometheusInitialized = true
initKnexPrometheusMetrics()
await initKnexPrometheusMetrics()
// Define the HTTP server
const server = http.createServer(async (req, res) => {
+9 -2
View File
@@ -16593,6 +16593,8 @@ __metadata:
undici: "npm:^5.28.4"
valid-filename: "npm:^3.1.0"
web-ifc: "npm:^0.0.36"
znv: "npm:^0.4.0"
zod: "npm:^3.22.4"
languageName: unknown
linkType: soft
@@ -16930,7 +16932,8 @@ __metadata:
webpack-dev-server: "npm:^4.6.0"
yargs: "npm:^17.3.0"
zlib: "npm:^1.0.5"
zod: "npm:^3.23.8"
znv: "npm:^0.4.0"
zod: "npm:^3.22.4"
languageName: unknown
linkType: soft
@@ -17134,6 +17137,7 @@ __metadata:
"@typescript-eslint/parser": "npm:^7.12.0"
eslint: "npm:^9.4.0"
eslint-config-prettier: "npm:^9.1.0"
knex: "npm:^2.4.1"
lodash: "npm:^4.17.21"
lodash-es: "npm:^4.17.21"
mixpanel: "npm:^0.17.0"
@@ -17147,6 +17151,7 @@ __metadata:
zod: "npm:^3.22.4"
peerDependencies:
"@tiptap/core": ^2.0.0-beta.176
knex: "*"
mixpanel: ^0.17.0
pino: ^8.7.0
pino-http: ^8.0.0
@@ -17357,6 +17362,8 @@ __metadata:
private-ip: "npm:^2.3.3"
prom-client: "npm:^14.0.1"
verror: "npm:^1.10.1"
znv: "npm:^0.4.0"
zod: "npm:^3.22.4"
languageName: unknown
linkType: soft
@@ -54645,7 +54652,7 @@ __metadata:
languageName: node
linkType: hard
"zod@npm:3.23.8, zod@npm:^3.23.8":
"zod@npm:3.23.8":
version: 3.23.8
resolution: "zod@npm:3.23.8"
checksum: 10/846fd73e1af0def79c19d510ea9e4a795544a67d5b34b7e1c4d0425bf6bfd1c719446d94cdfa1721c1987d891321d61f779e8236fde517dc0e524aa851a6eff1