From 4d5f96bb8ecd72731560d4dba3012da4edbb1bf5 Mon Sep 17 00:00:00 2001 From: Chuck Driesler Date: Sun, 12 Jan 2025 00:03:14 +0000 Subject: [PATCH] feat(regions): repo functions for copying project branches and commits --- .../typedefs/workspaces.graphql | 1 + .../modules/core/repositories/objects.ts | 882 +++++++++--------- .../server/modules/shared/helpers/dbHelper.ts | 4 +- .../modules/workspaces/domain/operations.ts | 8 + .../workspaces/repositories/regions.ts | 174 +++- .../modules/workspaces/services/projects.ts | 277 +++--- 6 files changed, 751 insertions(+), 595 deletions(-) diff --git a/packages/server/assets/workspacesCore/typedefs/workspaces.graphql b/packages/server/assets/workspacesCore/typedefs/workspaces.graphql index 226b28a18..8c5d69997 100644 --- a/packages/server/assets/workspacesCore/typedefs/workspaces.graphql +++ b/packages/server/assets/workspacesCore/typedefs/workspaces.graphql @@ -170,6 +170,7 @@ type WorkspaceProjectMutations { updateRole(input: ProjectUpdateRoleInput!): Project! @hasStreamRole(role: STREAM_OWNER) @hasWorkspaceRole(role: MEMBER) + moveToRegion(projectId: String!, regionKey: String!): Project! moveToWorkspace(projectId: String!, workspaceId: String!): Project! create(input: WorkspaceProjectCreateInput!): Project! } diff --git a/packages/server/modules/core/repositories/objects.ts b/packages/server/modules/core/repositories/objects.ts index 851bf78fa..bdf8a82ef 100644 --- a/packages/server/modules/core/repositories/objects.ts +++ b/packages/server/modules/core/repositories/objects.ts @@ -40,298 +40,105 @@ const tables = { export const getStreamObjectsFactory = (deps: { db: Knex }): GetStreamObjects => - async (streamId: string, objectIds: string[]): Promise => { - if (!objectIds?.length) return [] + async (streamId: string, objectIds: string[]): Promise => { + if (!objectIds?.length) return [] - const q = tables - .objects(deps.db) - .where(Objects.col.streamId, streamId) - .whereIn(Objects.col.id, objectIds) + const q = tables + .objects(deps.db) + .where(Objects.col.streamId, streamId) + .whereIn(Objects.col.id, objectIds) - return await q - } + return await q + } export const getObjectFactory = (deps: { db: Knex }): GetObject => - async (objectId: string, streamId: string): Promise> => { - return await tables - .objects(deps.db) - .where(Objects.col.id, objectId) - .andWhere(Objects.col.streamId, streamId) - .first() - } + async (objectId: string, streamId: string): Promise> => { + return await tables + .objects(deps.db) + .where(Objects.col.id, objectId) + .andWhere(Objects.col.streamId, streamId) + .first() + } export const getFormattedObjectFactory = (deps: { db: Knex }): GetFormattedObject => - async ({ streamId, objectId }) => { - const res = await tables - .objects(deps.db) - .where({ streamId, id: objectId }) - .select('*') - .first() - if (!res) return null + async ({ streamId, objectId }) => { + const res = await tables + .objects(deps.db) + .where({ streamId, id: objectId }) + .select('*') + .first() + if (!res) return null - // TODO: Why tho? A lot if not most of places already just use getObjectFactory, - const finalRes: SetOptional = res - if (finalRes.data) finalRes.data.totalChildrenCount = res.totalChildrenCount // move this back - delete finalRes.streamId // backwards compatibility + // TODO: Why tho? A lot if not most of places already just use getObjectFactory, + const finalRes: SetOptional = res + if (finalRes.data) finalRes.data.totalChildrenCount = res.totalChildrenCount // move this back + delete finalRes.streamId // backwards compatibility - return finalRes - } + return finalRes + } export const getBatchedStreamObjectsFactory = (deps: { db: Knex }): GetBatchedStreamObjects => - (streamId: string, options?: Partial) => { - const baseQuery = tables - .objects(deps.db) - .select('*') - .where(Objects.col.streamId, streamId) - .orderBy(Objects.col.id) + (streamId: string, options?: Partial) => { + const baseQuery = tables + .objects(deps.db) + .select('*') + .where(Objects.col.streamId, streamId) + .orderBy(Objects.col.id) - return executeBatchedSelect(baseQuery, options) - } + return executeBatchedSelect(baseQuery, options) + } export const insertObjectsFactory = (deps: { db: Knex }): StoreObjects => - async (objects: ObjectRecord[], options?: Partial<{ trx: Knex.Transaction }>) => { - const q = tables.objects(deps.db).insert(objects) - if (options?.trx) q.transacting(options.trx) - return await q - } + async (objects: ObjectRecord[], options?: Partial<{ trx: Knex.Transaction }>) => { + const q = tables.objects(deps.db).insert(objects) + if (options?.trx) q.transacting(options.trx) + return await q + } export const storeSingleObjectIfNotFoundFactory = (deps: { db: Knex }): StoreSingleObjectIfNotFound => - async (insertionObject) => { - await tables - .objects(deps.db) - .insert( - // knex is bothered by string being inserted into jsonb, which is actually fine - insertionObject as SpeckleObject - ) - .onConflict() - .ignore() - } + async (insertionObject) => { + await tables + .objects(deps.db) + .insert( + // knex is bothered by string being inserted into jsonb, which is actually fine + insertionObject as SpeckleObject + ) + .onConflict() + .ignore() + } export const storeObjectsIfNotFoundFactory = (deps: { db: Knex }): StoreObjectsIfNotFound => - async (batch) => { - await tables - .objects(deps.db) - .insert( - // knex is bothered by string being inserted into jsonb, which is actually fine - batch as SpeckleObject[] - ) - .onConflict() - .ignore() - } + async (batch) => { + await tables + .objects(deps.db) + .insert( + // knex is bothered by string being inserted into jsonb, which is actually fine + batch as SpeckleObject[] + ) + .onConflict() + .ignore() + } export const storeClosuresIfNotFoundFactory = (deps: { db: Knex }): StoreClosuresIfNotFound => - async (closuresBatch) => { - await tables - .objectChildrenClosure(deps.db) - .insert(closuresBatch) - .onConflict() - .ignore() - } + async (closuresBatch) => { + await tables + .objectChildrenClosure(deps.db) + .insert(closuresBatch) + .onConflict() + .ignore() + } export const getObjectChildrenStreamFactory = (deps: { db: Knex }): GetObjectChildrenStream => - async ({ streamId, objectId }) => { - const q = deps.db.with( - 'object_children_closure', - knex.raw( - `SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId" - FROM objects - JOIN jsonb_each_text(objects.data->'__closure') d ON true - where objects.id = ?`, - [streamId, objectId] - ) - ) - q.select('id') - q.select(knex.raw('data::text as "dataText"')) - q.from('object_children_closure') - - q.rightJoin('objects', function () { - this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( - 'objects.id', - '=', - 'object_children_closure.child' - ) - }) - .where( - knex.raw('object_children_closure."streamId" = ? AND parent = ?', [ - streamId, - objectId - ]) - ) - .orderBy('objects.id') - return q.stream({ highWaterMark: 500 }) - } - -export const getObjectsStreamFactory = - (deps: { db: Knex }): GetObjectsStream => - async ({ streamId, objectIds }) => { - const res = tables - .objects(deps.db) - .whereIn('id', objectIds) - .andWhere('streamId', streamId) - .orderBy('id') - .select( - knex.raw( - '"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", data::text as "dataText"' - ) - ) - return res.stream({ highWaterMark: 500 }) - } - -export const hasObjectsFactory = - (deps: { db: Knex }): HasObjects => - async ({ streamId, objectIds }) => { - const dbRes = await tables - .objects(deps.db) - .whereIn('id', objectIds) - .andWhere('streamId', streamId) - .select('id') - - const res: Record = {} - // eslint-disable-next-line @typescript-eslint/no-for-in-array - for (const i in objectIds) { - res[objectIds[i]] = false - } - // eslint-disable-next-line @typescript-eslint/no-for-in-array - for (const i in dbRes) { - res[dbRes[i].id] = true - } - return res - } - -export const getObjectChildrenFactory = - (deps: { db: Knex }): GetObjectChildren => - async ({ streamId, objectId, limit, depth, select, cursor }) => { - limit = toNumber(limit || 0) || 50 - depth = toNumber(depth || 0) || 1000 - - let fullObjectSelect = false - - const q = deps.db.with( - 'object_children_closure', - knex.raw( - `SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId" - FROM objects - JOIN jsonb_each_text(objects.data->'__closure') d ON true - where objects.id = ?`, - [streamId, objectId] - ) - ) - - if (Array.isArray(select)) { - select.forEach((field, index) => { - q.select( - knex.raw('jsonb_path_query(data, :path) as :name:', { - path: '$.' + field, - name: '' + index - }) - ) - }) - } else { - fullObjectSelect = true - q.select('data') - } - - q.select('id') - q.select('createdAt') - q.select('speckleType') - q.select('totalChildrenCount') - - q.from('object_children_closure') - - q.rightJoin('objects', function () { - this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( - 'objects.id', - '=', - 'object_children_closure.child' - ) - }) - .where( - knex.raw('object_children_closure."streamId" = ? AND parent = ?', [ - streamId, - objectId - ]) - ) - .andWhere(knex.raw('object_children_closure.mindepth < ?', [depth])) - .andWhere(knex.raw('id > ?', [cursor ? cursor : '0'])) - .orderBy('objects.id') - .limit(limit) - - const rows = await q - - if (rows.length === 0) { - return { objects: rows, cursor: null } - } - - if (!fullObjectSelect) - rows.forEach((o, i, arr) => { - const no = { - id: o.id, - createdAt: o.createdAt, - speckleType: o.speckleType, - totalChildrenCount: o.totalChildrenCount, - data: {} - } - let k = 0 - for (const field of select || []) { - set(no.data, field, o[k++]) - } - arr[i] = no - }) - - const lastId = rows[rows.length - 1].id - return { objects: rows, cursor: lastId } - } - -/** - * This query is inefficient on larger sets (n * 10k objects) as we need to return the total count on an arbitrarily (user) defined selection of objects. - * A possible future optimisation route would be to cache the total count of a query (as objects are immutable, it will not change) on a first run, and, if found on a subsequent round, do a simpler query and merge the total count result. - */ -export const getObjectChildrenQueryFactory = - (deps: { db: Knex }): GetObjectChildrenQuery => - async (params) => { - const { streamId, objectId, select, query } = params - - const limit = toNumber(params.limit || 0) || 50 - const depth = toNumber(params.depth || 0) || 1000 - const orderBy = params.orderBy || { field: 'id', direction: 'asc' } - - // Cursors received by this service should be base64 encoded. They are generated on first entry query by this service; They should never be client-side generated. - const cursor: Optional<{ - value: unknown - operator: string - field: string - lastSeenId?: string - }> = params.cursor - ? JSON.parse(Buffer.from(params.cursor, 'base64').toString('binary')) - : undefined - - // Flag that keeps track of whether we select the whole "data" part of an object or not - let fullObjectSelect = false - if (Array.isArray(select)) { - // if we order by a field that we do not select, select it! - if (orderBy && select.indexOf(orderBy.field) === -1) { - select.push(orderBy.field) - } - // // always add the id! - // if ( select.indexOf( 'id' ) === -1 ) select.unshift( 'id' ) - } else { - fullObjectSelect = true - } - - const additionalIdOrderBy = orderBy.field !== 'id' - - const operatorsWhitelist = ['=', '>', '>=', '<', '<=', '!='] - - const mainQuery = deps.db - .with( + async ({ streamId, objectId }) => { + const q = deps.db.with( 'object_children_closure', knex.raw( `SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId" @@ -341,201 +148,394 @@ export const getObjectChildrenQueryFactory = [streamId, objectId] ) ) - .with('objs', (cteInnerQuery) => { - // always select the id - cteInnerQuery.select('id').from('object_children_closure') - cteInnerQuery.select('createdAt') - cteInnerQuery.select('speckleType') - cteInnerQuery.select('totalChildrenCount') + q.select('id') + q.select(knex.raw('data::text as "dataText"')) + q.from('object_children_closure') - // if there are any select fields, add them - if (Array.isArray(select)) { - select.forEach((field, index) => { - cteInnerQuery.select( - knex.raw('jsonb_path_query(data, :path) as :name:', { - path: '$.' + field, - name: '' + index - }) - ) - }) - // otherwise, get the whole object, as stored in the jsonb column - } else { - cteInnerQuery.select('data') + q.rightJoin('objects', function () { + this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( + 'objects.id', + '=', + 'object_children_closure.child' + ) + }) + .where( + knex.raw('object_children_closure."streamId" = ? AND parent = ?', [ + streamId, + objectId + ]) + ) + .orderBy('objects.id') + return q.stream({ highWaterMark: 500 }) + } + +export const getObjectsStreamFactory = + (deps: { db: Knex }): GetObjectsStream => + async ({ streamId, objectIds }) => { + const res = tables + .objects(deps.db) + .whereIn('id', objectIds) + .andWhere('streamId', streamId) + .orderBy('id') + .select( + knex.raw( + '"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", data::text as "dataText"' + ) + ) + return res.stream({ highWaterMark: 500 }) + } + +export const hasObjectsFactory = + (deps: { db: Knex }): HasObjects => + async ({ streamId, objectIds }) => { + const dbRes = await tables + .objects(deps.db) + .whereIn('id', objectIds) + .andWhere('streamId', streamId) + .select('id') + + const res: Record = {} + // eslint-disable-next-line @typescript-eslint/no-for-in-array + for (const i in objectIds) { + res[objectIds[i]] = false + } + // eslint-disable-next-line @typescript-eslint/no-for-in-array + for (const i in dbRes) { + res[dbRes[i].id] = true + } + return res + } + +export const getObjectChildrenFactory = + (deps: { db: Knex }): GetObjectChildren => + async ({ streamId, objectId, limit, depth, select, cursor }) => { + limit = toNumber(limit || 0) || 50 + depth = toNumber(depth || 0) || 1000 + + let fullObjectSelect = false + + const q = deps.db.with( + 'object_children_closure', + knex.raw( + `SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId" + FROM objects + JOIN jsonb_each_text(objects.data->'__closure') d ON true + where objects.id = ?`, + [streamId, objectId] + ) + ) + + if (Array.isArray(select)) { + select.forEach((field, index) => { + q.select( + knex.raw('jsonb_path_query(data, :path) as :name:', { + path: '$.' + field, + name: '' + index + }) + ) + }) + } else { + fullObjectSelect = true + q.select('data') + } + + q.select('id') + q.select('createdAt') + q.select('speckleType') + q.select('totalChildrenCount') + + q.from('object_children_closure') + + q.rightJoin('objects', function () { + this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( + 'objects.id', + '=', + 'object_children_closure.child' + ) + }) + .where( + knex.raw('object_children_closure."streamId" = ? AND parent = ?', [ + streamId, + objectId + ]) + ) + .andWhere(knex.raw('object_children_closure.mindepth < ?', [depth])) + .andWhere(knex.raw('id > ?', [cursor ? cursor : '0'])) + .orderBy('objects.id') + .limit(limit) + + const rows = await q + + if (rows.length === 0) { + return { objects: rows, cursor: null } + } + + if (!fullObjectSelect) + rows.forEach((o, i, arr) => { + const no = { + id: o.id, + createdAt: o.createdAt, + speckleType: o.speckleType, + totalChildrenCount: o.totalChildrenCount, + data: {} + } + let k = 0 + for (const field of select || []) { + set(no.data, field, o[k++]) + } + arr[i] = no + }) + + const lastId = rows[rows.length - 1].id + return { objects: rows, cursor: lastId } + } + +/** + * This query is inefficient on larger sets (n * 10k objects) as we need to return the total count on an arbitrarily (user) defined selection of objects. + * A possible future optimisation route would be to cache the total count of a query (as objects are immutable, it will not change) on a first run, and, if found on a subsequent round, do a simpler query and merge the total count result. + */ +export const getObjectChildrenQueryFactory = + (deps: { db: Knex }): GetObjectChildrenQuery => + async (params) => { + const { streamId, objectId, select, query } = params + + const limit = toNumber(params.limit || 0) || 50 + const depth = toNumber(params.depth || 0) || 1000 + const orderBy = params.orderBy || { field: 'id', direction: 'asc' } + + // Cursors received by this service should be base64 encoded. They are generated on first entry query by this service; They should never be client-side generated. + const cursor: Optional<{ + value: unknown + operator: string + field: string + lastSeenId?: string + }> = params.cursor + ? JSON.parse(Buffer.from(params.cursor, 'base64').toString('binary')) + : undefined + + // Flag that keeps track of whether we select the whole "data" part of an object or not + let fullObjectSelect = false + if (Array.isArray(select)) { + // if we order by a field that we do not select, select it! + if (orderBy && select.indexOf(orderBy.field) === -1) { + select.push(orderBy.field) } + // // always add the id! + // if ( select.indexOf( 'id' ) === -1 ) select.unshift( 'id' ) + } else { + fullObjectSelect = true + } - // join on objects table - cteInnerQuery - .join('objects', function () { - this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( - 'objects.id', - '=', - 'object_children_closure.child' - ) - }) - .where('object_children_closure.streamId', streamId) - .andWhere('parent', objectId) - .andWhere('mindepth', '<', depth) + const additionalIdOrderBy = orderBy.field !== 'id' - // Add user provided filters/queries. - if (Array.isArray(query) && query.length > 0) { - cteInnerQuery.andWhere((nestedWhereQuery) => { - query.forEach((statement, index) => { - let castType = 'text' - if (typeof statement.value === 'string') castType = 'text' - if (typeof statement.value === 'boolean') castType = 'boolean' - if (typeof statement.value === 'number') castType = 'numeric' + const operatorsWhitelist = ['=', '>', '>=', '<', '<=', '!='] - if (operatorsWhitelist.indexOf(statement.operator) === -1) - throw new Error('Invalid operator for query') + const mainQuery = deps.db + .with( + 'object_children_closure', + knex.raw( + `SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId" + FROM objects + JOIN jsonb_each_text(objects.data->'__closure') d ON true + where objects.id = ?`, + [streamId, objectId] + ) + ) + .with('objs', (cteInnerQuery) => { + // always select the id + cteInnerQuery.select('id').from('object_children_closure') + cteInnerQuery.select('createdAt') + cteInnerQuery.select('speckleType') + cteInnerQuery.select('totalChildrenCount') - // Determine the correct where clause (where, and where, or where) - let whereClause: keyof typeof nestedWhereQuery - if (index === 0) whereClause = 'where' - else if (statement.verb && statement.verb.toLowerCase() === 'or') - whereClause = 'orWhere' - else whereClause = 'andWhere' - - // Note: castType is generated from the statement's value and operators are matched against a whitelist. - // If comparing with strings, the jsonb_path_query(_first) func returns json encoded strings (ie, `bar` is actually `"bar"`), hence we need to add the quotes manually to the raw provided comparison value. - nestedWhereQuery[whereClause]( - knex.raw( - `jsonb_path_query_first( data, ? )::${castType} ${statement.operator} ? `, - [ - '$.' + statement.field, - castType === 'text' ? `"${statement.value}"` : statement.value - ] - ) + // if there are any select fields, add them + if (Array.isArray(select)) { + select.forEach((field, index) => { + cteInnerQuery.select( + knex.raw('jsonb_path_query(data, :path) as :name:', { + path: '$.' + field, + name: '' + index + }) ) }) - }) - } + // otherwise, get the whole object, as stored in the jsonb column + } else { + cteInnerQuery.select('data') + } - // Order by clause; validate direction! - const direction = - orderBy.direction && orderBy.direction.toLowerCase() === 'desc' - ? 'desc' - : 'asc' - if (orderBy.field === 'id') { - cteInnerQuery.orderBy('id', direction) + // join on objects table + cteInnerQuery + .join('objects', function () { + this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn( + 'objects.id', + '=', + 'object_children_closure.child' + ) + }) + .where('object_children_closure.streamId', streamId) + .andWhere('parent', objectId) + .andWhere('mindepth', '<', depth) + + // Add user provided filters/queries. + if (Array.isArray(query) && query.length > 0) { + cteInnerQuery.andWhere((nestedWhereQuery) => { + query.forEach((statement, index) => { + let castType = 'text' + if (typeof statement.value === 'string') castType = 'text' + if (typeof statement.value === 'boolean') castType = 'boolean' + if (typeof statement.value === 'number') castType = 'numeric' + + if (operatorsWhitelist.indexOf(statement.operator) === -1) + throw new Error('Invalid operator for query') + + // Determine the correct where clause (where, and where, or where) + let whereClause: keyof typeof nestedWhereQuery + if (index === 0) whereClause = 'where' + else if (statement.verb && statement.verb.toLowerCase() === 'or') + whereClause = 'orWhere' + else whereClause = 'andWhere' + + // Note: castType is generated from the statement's value and operators are matched against a whitelist. + // If comparing with strings, the jsonb_path_query(_first) func returns json encoded strings (ie, `bar` is actually `"bar"`), hence we need to add the quotes manually to the raw provided comparison value. + nestedWhereQuery[whereClause]( + knex.raw( + `jsonb_path_query_first( data, ? )::${castType} ${statement.operator} ? `, + [ + '$.' + statement.field, + castType === 'text' ? `"${statement.value}"` : statement.value + ] + ) + ) + }) + }) + } + + // Order by clause; validate direction! + const direction = + orderBy.direction && orderBy.direction.toLowerCase() === 'desc' + ? 'desc' + : 'asc' + if (orderBy.field === 'id') { + cteInnerQuery.orderBy('id', direction) + } else { + cteInnerQuery.orderByRaw( + knex.raw(`jsonb_path_query_first( data, ? ) ${direction}, id asc`, [ + '$.' + orderBy.field + ]) + ) + } + }) + .select('*') + .from('objs') + .joinRaw('RIGHT JOIN ( SELECT count(*) FROM "objs" ) c(total_count) ON TRUE') + + // Set cursor clause, if present. If it's not present, it's an entry query; this method will return a cursor based on its given query. + // We have implemented keyset pagination for more efficient searches on larger sets. This approach depends on an order by value provided by the user and a (hidden) primary key. + // logger.debug( cursor ) + if (cursor) { + let castType = 'text' + if (typeof cursor.value === 'string') castType = 'text' + if (typeof cursor.value === 'boolean') castType = 'boolean' + if (typeof cursor.value === 'number') castType = 'numeric' + + // When strings are used inside an order clause, as mentioned above, we need to add quotes around the comparison value, as the jsonb_path_query funcs return json encoded strings (`{"test":"foo"}` => test is returned as `"foo"`) + if (castType === 'text') cursor.value = `"${cursor.value}"` + + if (operatorsWhitelist.indexOf(cursor.operator) === -1) + throw new Error('Invalid operator for cursor') + + // Unwrapping the tuple comparison of ( userOrderByField, id ) > ( lastValueOfUserOrderBy, lastSeenId ) + if (fullObjectSelect) { + if (cursor.field === 'id') { + mainQuery.where(knex.raw(`id ${cursor.operator} ? `, [cursor.value])) + } else { + mainQuery.where( + knex.raw( + `jsonb_path_query_first( data, ? )::${castType} ${cursor.operator}= ? `, + ['$.' + cursor.field, cursor.value] + ) + ) + } } else { - cteInnerQuery.orderByRaw( - knex.raw(`jsonb_path_query_first( data, ? ) ${direction}, id asc`, [ - '$.' + orderBy.field + mainQuery.where( + knex.raw(`??::${castType} ${cursor.operator}= ? `, [ + (select || []).indexOf(cursor.field).toString(), + cursor.value ]) ) } - }) - .select('*') - .from('objs') - .joinRaw('RIGHT JOIN ( SELECT count(*) FROM "objs" ) c(total_count) ON TRUE') - // Set cursor clause, if present. If it's not present, it's an entry query; this method will return a cursor based on its given query. - // We have implemented keyset pagination for more efficient searches on larger sets. This approach depends on an order by value provided by the user and a (hidden) primary key. - // logger.debug( cursor ) - if (cursor) { - let castType = 'text' - if (typeof cursor.value === 'string') castType = 'text' - if (typeof cursor.value === 'boolean') castType = 'boolean' - if (typeof cursor.value === 'number') castType = 'numeric' - - // When strings are used inside an order clause, as mentioned above, we need to add quotes around the comparison value, as the jsonb_path_query funcs return json encoded strings (`{"test":"foo"}` => test is returned as `"foo"`) - if (castType === 'text') cursor.value = `"${cursor.value}"` - - if (operatorsWhitelist.indexOf(cursor.operator) === -1) - throw new Error('Invalid operator for cursor') - - // Unwrapping the tuple comparison of ( userOrderByField, id ) > ( lastValueOfUserOrderBy, lastSeenId ) - if (fullObjectSelect) { - if (cursor.field === 'id') { - mainQuery.where(knex.raw(`id ${cursor.operator} ? `, [cursor.value])) - } else { - mainQuery.where( - knex.raw( - `jsonb_path_query_first( data, ? )::${castType} ${cursor.operator}= ? `, - ['$.' + cursor.field, cursor.value] - ) - ) + if (cursor.lastSeenId) { + mainQuery.andWhere((qb) => { + // Dunno what the TS issue here is, the JS code itself seemed to work fine as JS + // eslint-disable-next-line @typescript-eslint/no-explicit-any + qb.where('id' as any, '>', cursor!.lastSeenId as any) + if (fullObjectSelect) + qb.orWhere( + knex.raw( + `jsonb_path_query_first( data, ? )::${castType} ${cursor!.operator} ? `, + ['$.' + cursor!.field, cursor!.value] + ) + ) + else + qb.orWhere( + knex.raw(`??::${castType} ${cursor!.operator} ? `, [ + (select || []).indexOf(cursor!.field).toString(), + cursor!.value + ]) + ) + }) } - } else { - mainQuery.where( - knex.raw(`??::${castType} ${cursor.operator}= ? `, [ - (select || []).indexOf(cursor.field).toString(), - cursor.value - ]) - ) } - if (cursor.lastSeenId) { - mainQuery.andWhere((qb) => { - // Dunno what the TS issue here is, the JS code itself seemed to work fine as JS - // eslint-disable-next-line @typescript-eslint/no-explicit-any - qb.where('id' as any, '>', cursor!.lastSeenId as any) - if (fullObjectSelect) - qb.orWhere( - knex.raw( - `jsonb_path_query_first( data, ? )::${castType} ${cursor!.operator} ? `, - ['$.' + cursor!.field, cursor!.value] - ) - ) - else - qb.orWhere( - knex.raw(`??::${castType} ${cursor!.operator} ? `, [ - (select || []).indexOf(cursor!.field).toString(), - cursor!.value - ]) - ) + mainQuery.limit(limit) + // logger.debug( mainQuery.toString() ) + // Finally, execute the query + const rows = await mainQuery + const totalCount = rows && rows.length > 0 ? parseInt(rows[0].total_count) : 0 + + // Return early + if (totalCount === 0) return { totalCount, objects: [], cursor: null } + + // Reconstruct the object based on the provided select paths. + if (!fullObjectSelect) { + rows.forEach((o, i, arr) => { + const no = { + id: o.id, + createdAt: o.createdAt, + speckleType: o.speckleType, + totalChildrenCount: o.totalChildrenCount, + data: {} + } + let k = 0 + for (const field of select || []) { + set(no.data, field, o[k++]) + } + arr[i] = no }) } + + // Assemble the cursor for an eventual next call + const cursorObj: typeof cursor = { + field: cursor?.field || orderBy.field, + operator: + cursor?.operator || + (orderBy.direction && orderBy.direction.toLowerCase() === 'desc' ? '<' : '>'), + value: get(rows[rows.length - 1], `data.${orderBy.field}`) + } + + // If we're not ordering by id (default case, where no order by argument is provided), we need to add the last seen id of this query in order to enable keyset pagination. + if (additionalIdOrderBy) { + cursorObj.lastSeenId = rows[rows.length - 1].id + } + + // Cursor objects should be client-side opaque, hence we encode them to base64. + const cursorEncoded = Buffer.from(JSON.stringify(cursorObj), 'binary').toString( + 'base64' + ) + return { + totalCount, + objects: rows, + cursor: rows.length === limit ? cursorEncoded : null + } } - - mainQuery.limit(limit) - // logger.debug( mainQuery.toString() ) - // Finally, execute the query - const rows = await mainQuery - const totalCount = rows && rows.length > 0 ? parseInt(rows[0].total_count) : 0 - - // Return early - if (totalCount === 0) return { totalCount, objects: [], cursor: null } - - // Reconstruct the object based on the provided select paths. - if (!fullObjectSelect) { - rows.forEach((o, i, arr) => { - const no = { - id: o.id, - createdAt: o.createdAt, - speckleType: o.speckleType, - totalChildrenCount: o.totalChildrenCount, - data: {} - } - let k = 0 - for (const field of select || []) { - set(no.data, field, o[k++]) - } - arr[i] = no - }) - } - - // Assemble the cursor for an eventual next call - const cursorObj: typeof cursor = { - field: cursor?.field || orderBy.field, - operator: - cursor?.operator || - (orderBy.direction && orderBy.direction.toLowerCase() === 'desc' ? '<' : '>'), - value: get(rows[rows.length - 1], `data.${orderBy.field}`) - } - - // If we're not ordering by id (default case, where no order by argument is provided), we need to add the last seen id of this query in order to enable keyset pagination. - if (additionalIdOrderBy) { - cursorObj.lastSeenId = rows[rows.length - 1].id - } - - // Cursor objects should be client-side opaque, hence we encode them to base64. - const cursorEncoded = Buffer.from(JSON.stringify(cursorObj), 'binary').toString( - 'base64' - ) - return { - totalCount, - objects: rows, - cursor: rows.length === limit ? cursorEncoded : null - } - } diff --git a/packages/server/modules/shared/helpers/dbHelper.ts b/packages/server/modules/shared/helpers/dbHelper.ts index 5fa9e8659..d8fac4a0b 100644 --- a/packages/server/modules/shared/helpers/dbHelper.ts +++ b/packages/server/modules/shared/helpers/dbHelper.ts @@ -22,7 +22,7 @@ export async function* executeBatchedSelect< >( selectQuery: Knex.QueryBuilder, options?: Partial -): AsyncGenerator { +): AsyncGenerator, void, unknown> { const { batchSize = 100, trx } = options || {} if (trx) selectQuery.transacting(trx) @@ -33,7 +33,7 @@ export async function* executeBatchedSelect< let currentOffset = 0 while (hasMorePages) { const q = selectQuery.clone().offset(currentOffset) - const results = (await q) as TResult + const results = (await q) as Awaited<(typeof selectQuery)> if (!results.length) { hasMorePages = false diff --git a/packages/server/modules/workspaces/domain/operations.ts b/packages/server/modules/workspaces/domain/operations.ts index 66e39b30f..eb54b5a10 100644 --- a/packages/server/modules/workspaces/domain/operations.ts +++ b/packages/server/modules/workspaces/domain/operations.ts @@ -308,3 +308,11 @@ export type GetWorkspaceCreationState = (params: { export type UpsertWorkspaceCreationState = (params: { workspaceCreationState: WorkspaceCreationState }) => Promise + +/** + * Project regions + */ + +export type CopyProjects = (params: { projectIds: string[] }) => Promise +export type CopyProjectModels = (params: { projectIds: string[] }) => Promise> +export type CopyProjectVersions = (params: { projectIds: string[] }) => Promise> diff --git a/packages/server/modules/workspaces/repositories/regions.ts b/packages/server/modules/workspaces/repositories/regions.ts index 20c267c62..c6688386d 100644 --- a/packages/server/modules/workspaces/repositories/regions.ts +++ b/packages/server/modules/workspaces/repositories/regions.ts @@ -1,7 +1,15 @@ -import { buildTableHelper } from '@/modules/core/dbSchema' +import { BranchCommits, Branches, buildTableHelper, Commits, StreamCommits, StreamFavorites, Streams, StreamsMeta } from '@/modules/core/dbSchema' +import { Branch } from '@/modules/core/domain/branches/types' +import { Commit } from '@/modules/core/domain/commits/types' +import { Stream } from '@/modules/core/domain/streams/types' +import { BranchCommitRecord, StreamCommitRecord, StreamFavoriteRecord } from '@/modules/core/helpers/types' import { RegionRecord } from '@/modules/multiregion/helpers/types' import { Regions } from '@/modules/multiregion/repositories' +import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper' import { + CopyProjectModels, + CopyProjects, + CopyProjectVersions, GetDefaultRegion, UpsertRegionAssignment } from '@/modules/workspaces/domain/operations' @@ -15,32 +23,156 @@ export const WorkspaceRegions = buildTableHelper('workspace_regions', [ const tables = { workspaceRegions: (db: Knex) => db(WorkspaceRegions.name), - regions: (db: Knex) => db(Regions.name) + regions: (db: Knex) => db(Regions.name), + projects: (db: Knex) => db(Streams.name), + models: (db: Knex) => db(Branches.name), + versions: (db: Knex) => db(Commits.name), + branchCommits: (db: Knex) => db(BranchCommits.name), + streamCommits: (db: Knex) => db(StreamCommits.name), + streamFavorites: (db: Knex) => db(StreamFavorites.name), + streamsMeta: (db: Knex) => db(StreamsMeta.name) } export const upsertRegionAssignmentFactory = (deps: { db: Knex }): UpsertRegionAssignment => - async (params) => { - const { workspaceId, regionKey } = params - const [row] = await tables - .workspaceRegions(deps.db) - .insert({ workspaceId, regionKey }, '*') - .onConflict(['workspaceId', 'regionKey']) - .merge() + async (params) => { + const { workspaceId, regionKey } = params + const [row] = await tables + .workspaceRegions(deps.db) + .insert({ workspaceId, regionKey }, '*') + .onConflict(['workspaceId', 'regionKey']) + .merge() - return row - } + return row + } export const getDefaultRegionFactory = (deps: { db: Knex }): GetDefaultRegion => - async (params) => { - const { workspaceId } = params - const row = await tables - .regions(deps.db) - .select(Regions.cols) - .join(WorkspaceRegions.name, WorkspaceRegions.col.regionKey, Regions.col.key) - .where({ [WorkspaceRegions.col.workspaceId]: workspaceId }) - .first() + async (params) => { + const { workspaceId } = params + const row = await tables + .regions(deps.db) + .select(Regions.cols) + .join(WorkspaceRegions.name, WorkspaceRegions.col.regionKey, Regions.col.key) + .where({ [WorkspaceRegions.col.workspaceId]: workspaceId }) + .first() + + return row + } + + +export const copyProjects = + (deps: { sourceDb: Knex, targetDb: Knex }): CopyProjects => + async ({ projectIds }) => { + const selectProjects = tables.projects(deps.sourceDb).select('*').whereIn(Streams.col.id, projectIds) + const copiedProjectIds: string[] = [] + + // Copy project record + for await (const projects of executeBatchedSelect(selectProjects)) { + for (const project of projects) { + // Store copied project id + copiedProjectIds.push(project.id) + + // Copy `streams` row to target db + await tables.projects(deps.targetDb) + .insert(project) + .onConflict() + .ignore() + } + + const projectIds = projects.map((project) => project.id) + + // Fetch `stream_favorites` rows for projects in batch + const selectStreamFavorites = tables.streamFavorites(deps.sourceDb).select('*').whereIn(StreamFavorites.col.streamId, projectIds) + + for await (const streamFavorites of executeBatchedSelect(selectStreamFavorites)) { + for (const streamFavorite of streamFavorites) { + // Copy `stream_favorites` row to target db + await tables.streamFavorites(deps.targetDb).insert(streamFavorite).onConflict().ignore() + } + } + + // Fetch `streams_meta` rows for projects in batch + const selectStreamsMetadata = tables.streamsMeta(deps.sourceDb).select('*').whereIn(StreamsMeta.col.streamId, projectIds) + + for await (const streamsMetadataBatch of executeBatchedSelect(selectStreamsMetadata)) { + for (const streamMetadata of streamsMetadataBatch) { + // Copy `streams_meta` row to target db + await tables.streamsMeta(deps.targetDb).insert(streamMetadata).onConflict().ignore() + } + } + } + + return copiedProjectIds + } + +export const copyProjectModels = + (deps: { sourceDb: Knex, targetDb: Knex }): CopyProjectModels => + async ({ projectIds }) => { + const copiedModelIds: Record = projectIds.reduce((result, id) => ({ ...result, [id]: [] }), {}) + + for (const projectId of projectIds) { + const selectModels = tables.models(deps.sourceDb).select('*').where({ streamId: projectId }) + + for await (const models of executeBatchedSelect(selectModels)) { + for (const model of models) { + // Store copied model ids + copiedModelIds[projectId].push(model.id) + + // Copy `branches` row to target db + await tables.models(deps.targetDb).insert(model).onConflict().ignore() + } + } + } + + return copiedModelIds + } + +export const copyProjectVersions = + (deps: { sourceDb: Knex, targetDb: Knex }): CopyProjectVersions => + async ({ projectIds }) => { + const copiedVersionIds: Record = projectIds.reduce((result, id) => ({ ...result, [id]: [] }), {}) + + for (const projectId of projectIds) { + const selectVersions = tables.streamCommits(deps.sourceDb).select('*') + .join(Commits.name, Commits.col.id, StreamCommits.col.commitId) + .where({ streamId: projectId }) + + for await (const versions of executeBatchedSelect(selectVersions)) { + for (const version of versions) { + const { commitId, ...commit } = version + + // Store copied version id + copiedVersionIds[projectId].push(commitId) + + // Copy `commits` row to target db + await tables.versions(deps.targetDb).insert(commit).onConflict().ignore() + } + + const commitIds = versions.map((version) => version.commitId) + + // Fetch `branch_commits` rows for versions in batch + const selectBranchCommits = tables.branchCommits(deps.sourceDb).select('*').whereIn(BranchCommits.col.commitId, commitIds) + + for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) { + for (const branchCommit of branchCommits) { + // Copy `branch_commits` row to target db + await tables.branchCommits(deps.targetDb).insert(branchCommit).onConflict().ignore() + } + } + + // Fetch `stream_commits` rows for versions in batch + const selectStreamCommits = tables.streamCommits(deps.sourceDb).select('*').whereIn(StreamCommits.col.commitId, commitIds) + + for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) { + for (const streamCommit of streamCommits) { + // Copy `stream_commits` row to target db + await tables.streamCommits(deps.targetDb).insert(streamCommit).onConflict().ignore() + } + } + } + } + + return copiedVersionIds + } - return row - } diff --git a/packages/server/modules/workspaces/services/projects.ts b/packages/server/modules/workspaces/services/projects.ts index 886a28c99..f8d81f058 100644 --- a/packages/server/modules/workspaces/services/projects.ts +++ b/packages/server/modules/workspaces/services/projects.ts @@ -1,5 +1,8 @@ import { StreamRecord } from '@/modules/core/helpers/types' import { + CopyProjectModels, + CopyProjects, + CopyProjectVersions, GetDefaultRegion, GetWorkspace, GetWorkspaceRoleForUser, @@ -97,23 +100,23 @@ type GetWorkspaceProjectsReturnValue = { export const getWorkspaceProjectsFactory = ({ getStreams }: { getStreams: GetUserStreamsPage }) => - async ( - args: GetWorkspaceProjectsArgs, - opts: GetWorkspaceProjectsOptions - ): Promise => { - const { streams, cursor } = await getStreams({ - cursor: opts.cursor, - limit: opts.limit || 25, - searchQuery: opts.filter?.search || undefined, - workspaceId: args.workspaceId, - userId: opts.filter.userId - }) + async ( + args: GetWorkspaceProjectsArgs, + opts: GetWorkspaceProjectsOptions + ): Promise => { + const { streams, cursor } = await getStreams({ + cursor: opts.cursor, + limit: opts.limit || 25, + searchQuery: opts.filter?.search || undefined, + workspaceId: args.workspaceId, + userId: opts.filter.userId + }) - return { - items: streams, - cursor + return { + items: streams, + cursor + } } - } type MoveProjectToWorkspaceArgs = { projectId: string @@ -138,66 +141,78 @@ export const moveProjectToWorkspaceFactory = getWorkspaceRoleToDefaultProjectRoleMapping: GetWorkspaceRoleToDefaultProjectRoleMapping updateWorkspaceRole: UpdateWorkspaceRole }) => - async ({ - projectId, - workspaceId - }: MoveProjectToWorkspaceArgs): Promise => { - const project = await getProject({ projectId }) + async ({ + projectId, + workspaceId + }: MoveProjectToWorkspaceArgs): Promise => { + const project = await getProject({ projectId }) - if (!project) throw new ProjectNotFoundError() - if (project.workspaceId?.length) { - // We do not currently support moving projects between workspaces - throw new WorkspaceInvalidProjectError( - 'Specified project already belongs to a workspace. Moving between workspaces is not yet supported.' - ) - } - - // Update roles for current project members - const projectTeam = await getProjectCollaborators({ projectId }) - const workspaceTeam = await getWorkspaceRoles({ workspaceId }) - const defaultProjectRoleMapping = await getWorkspaceRoleToDefaultProjectRoleMapping( - { workspaceId } - ) - - for (const projectMembers of chunk(projectTeam, 5)) { - await Promise.all( - projectMembers.map( - async ({ id: userId, role: serverRole, streamRole: currentProjectRole }) => { - // Update workspace role. Prefer existing workspace role if there is one. - const currentWorkspaceRole = workspaceTeam.find( - (role) => role.userId === userId - ) - const nextWorkspaceRole = currentWorkspaceRole ?? { - userId, - workspaceId, - role: - serverRole === Roles.Server.Guest - ? Roles.Workspace.Guest - : Roles.Workspace.Member, - createdAt: new Date() - } - await updateWorkspaceRole(nextWorkspaceRole) - - // Update project role. Prefer default workspace project role if more permissive. - const defaultProjectRole = - defaultProjectRoleMapping[nextWorkspaceRole.role] ?? Roles.Stream.Reviewer - const nextProjectRole = orderByWeight( - [currentProjectRole, defaultProjectRole], - coreUserRoles - )[0] - await upsertProjectRole({ - userId, - projectId, - role: nextProjectRole.name as StreamRoles - }) - } + if (!project) throw new ProjectNotFoundError() + if (project.workspaceId?.length) { + // We do not currently support moving projects between workspaces + throw new WorkspaceInvalidProjectError( + 'Specified project already belongs to a workspace. Moving between workspaces is not yet supported.' ) + } + + // Update roles for current project members + const projectTeam = await getProjectCollaborators({ projectId }) + const workspaceTeam = await getWorkspaceRoles({ workspaceId }) + const defaultProjectRoleMapping = await getWorkspaceRoleToDefaultProjectRoleMapping( + { workspaceId } ) + + for (const projectMembers of chunk(projectTeam, 5)) { + await Promise.all( + projectMembers.map( + async ({ id: userId, role: serverRole, streamRole: currentProjectRole }) => { + // Update workspace role. Prefer existing workspace role if there is one. + const currentWorkspaceRole = workspaceTeam.find( + (role) => role.userId === userId + ) + const nextWorkspaceRole = currentWorkspaceRole ?? { + userId, + workspaceId, + role: + serverRole === Roles.Server.Guest + ? Roles.Workspace.Guest + : Roles.Workspace.Member, + createdAt: new Date() + } + await updateWorkspaceRole(nextWorkspaceRole) + + // Update project role. Prefer default workspace project role if more permissive. + const defaultProjectRole = + defaultProjectRoleMapping[nextWorkspaceRole.role] ?? Roles.Stream.Reviewer + const nextProjectRole = orderByWeight( + [currentProjectRole, defaultProjectRole], + coreUserRoles + )[0] + await upsertProjectRole({ + userId, + projectId, + role: nextProjectRole.name as StreamRoles + }) + } + ) + ) + } + + // Assign project to workspace + return await updateProject({ projectUpdate: { id: projectId, workspaceId } }) } - // Assign project to workspace - return await updateProject({ projectUpdate: { id: projectId, workspaceId } }) - } +export const moveProjectToRegionFactory = + (deps: { + copyProjects: CopyProjects, + copyProjectModels: CopyProjectModels, + copyProjectVersions: CopyProjectVersions + }) => + async (args: { projectId: string }): Promise => { + const projectIds = await deps.copyProjects({ projectIds: [args.projectId] }) + const modelIdsByProjectId = await deps.copyProjectModels({ projectIds }) + const versionIdsByProjectId = await deps.copyProjectVersions({ projectIds }) + } export const getWorkspaceRoleToDefaultProjectRoleMappingFactory = ({ @@ -205,19 +220,19 @@ export const getWorkspaceRoleToDefaultProjectRoleMappingFactory = }: { getWorkspace: GetWorkspace }): GetWorkspaceRoleToDefaultProjectRoleMapping => - async ({ workspaceId }) => { - const workspace = await getWorkspace({ workspaceId }) + async ({ workspaceId }) => { + const workspace = await getWorkspace({ workspaceId }) - if (!workspace) { - throw new WorkspaceNotFoundError() - } + if (!workspace) { + throw new WorkspaceNotFoundError() + } - return { - [Roles.Workspace.Guest]: null, - [Roles.Workspace.Member]: workspace.defaultProjectRole, - [Roles.Workspace.Admin]: Roles.Stream.Owner + return { + [Roles.Workspace.Guest]: null, + [Roles.Workspace.Member]: workspace.defaultProjectRole, + [Roles.Workspace.Admin]: Roles.Stream.Owner + } } - } export const updateWorkspaceProjectRoleFactory = ({ @@ -229,63 +244,63 @@ export const updateWorkspaceProjectRoleFactory = getWorkspaceRoleForUser: GetWorkspaceRoleForUser updateStreamRoleAndNotify: UpdateStreamRole }): UpdateWorkspaceProjectRole => - async ({ role, updater }) => { - const { workspaceId } = (await getStream({ streamId: role.projectId })) ?? {} - if (!workspaceId) throw new WorkspaceInvalidProjectError() + async ({ role, updater }) => { + const { workspaceId } = (await getStream({ streamId: role.projectId })) ?? {} + if (!workspaceId) throw new WorkspaceInvalidProjectError() - const currentWorkspaceRole = await getWorkspaceRoleForUser({ - workspaceId, - userId: role.userId - }) + const currentWorkspaceRole = await getWorkspaceRoleForUser({ + workspaceId, + userId: role.userId + }) - if (currentWorkspaceRole?.role === Roles.Workspace.Admin) { - // User is workspace admin and cannot have their project roles changed - throw new WorkspaceAdminError() + if (currentWorkspaceRole?.role === Roles.Workspace.Admin) { + // User is workspace admin and cannot have their project roles changed + throw new WorkspaceAdminError() + } + + if ( + currentWorkspaceRole?.role === Roles.Workspace.Guest && + role.role === Roles.Stream.Owner + ) { + // Workspace guests cannot be project owners + throw new WorkspaceInvalidRoleError('Workspace guests cannot be project owners.') + } + + return await updateStreamRoleAndNotify( + role, + updater.userId!, + updater.resourceAccessRules + ) } - if ( - currentWorkspaceRole?.role === Roles.Workspace.Guest && - role.role === Roles.Stream.Owner - ) { - // Workspace guests cannot be project owners - throw new WorkspaceInvalidRoleError('Workspace guests cannot be project owners.') - } - - return await updateStreamRoleAndNotify( - role, - updater.userId!, - updater.resourceAccessRules - ) - } - export const createWorkspaceProjectFactory = (deps: { getDefaultRegion: GetDefaultRegion }) => - async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => { - const { input, ownerId } = params - const workspaceDefaultRegion = await deps.getDefaultRegion({ - workspaceId: input.workspaceId - }) - const regionKey = workspaceDefaultRegion?.key - const projectDb = await getDb({ regionKey }) - const db = mainDb + async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => { + const { input, ownerId } = params + const workspaceDefaultRegion = await deps.getDefaultRegion({ + workspaceId: input.workspaceId + }) + const regionKey = workspaceDefaultRegion?.key + const projectDb = await getDb({ regionKey }) + const db = mainDb - // todo, use the command factory here, but for that, we need to migrate to the event bus - // deps not injected to ensure proper DB injection - const createNewProject = createNewProjectFactory({ - storeProject: storeProjectFactory({ db: projectDb }), - getProject: getProjectFactory({ db }), - deleteProject: deleteProjectFactory({ db: projectDb }), - storeModel: storeModelFactory({ db: projectDb }), - // THIS MUST GO TO THE MAIN DB - storeProjectRole: storeProjectRoleFactory({ db }), - projectsEventsEmitter: ProjectsEmitter.emit - }) + // todo, use the command factory here, but for that, we need to migrate to the event bus + // deps not injected to ensure proper DB injection + const createNewProject = createNewProjectFactory({ + storeProject: storeProjectFactory({ db: projectDb }), + getProject: getProjectFactory({ db }), + deleteProject: deleteProjectFactory({ db: projectDb }), + storeModel: storeModelFactory({ db: projectDb }), + // THIS MUST GO TO THE MAIN DB + storeProjectRole: storeProjectRoleFactory({ db }), + projectsEventsEmitter: ProjectsEmitter.emit + }) - const project = await createNewProject({ - ...input, - regionKey, - ownerId - }) + const project = await createNewProject({ + ...input, + regionKey, + ownerId + }) - return project - } + return project + }