feat(regions): repo functions for copying project branches and commits

This commit is contained in:
Chuck Driesler
2025-01-12 00:03:14 +00:00
parent b98160f037
commit 4d5f96bb8e
6 changed files with 751 additions and 595 deletions
@@ -170,6 +170,7 @@ type WorkspaceProjectMutations {
updateRole(input: ProjectUpdateRoleInput!): Project!
@hasStreamRole(role: STREAM_OWNER)
@hasWorkspaceRole(role: MEMBER)
moveToRegion(projectId: String!, regionKey: String!): Project!
moveToWorkspace(projectId: String!, workspaceId: String!): Project!
create(input: WorkspaceProjectCreateInput!): Project!
}
@@ -40,298 +40,105 @@ const tables = {
export const getStreamObjectsFactory =
(deps: { db: Knex }): GetStreamObjects =>
async (streamId: string, objectIds: string[]): Promise<ObjectRecord[]> => {
if (!objectIds?.length) return []
async (streamId: string, objectIds: string[]): Promise<ObjectRecord[]> => {
if (!objectIds?.length) return []
const q = tables
.objects(deps.db)
.where(Objects.col.streamId, streamId)
.whereIn(Objects.col.id, objectIds)
const q = tables
.objects(deps.db)
.where(Objects.col.streamId, streamId)
.whereIn(Objects.col.id, objectIds)
return await q
}
return await q
}
export const getObjectFactory =
(deps: { db: Knex }): GetObject =>
async (objectId: string, streamId: string): Promise<Optional<ObjectRecord>> => {
return await tables
.objects(deps.db)
.where(Objects.col.id, objectId)
.andWhere(Objects.col.streamId, streamId)
.first()
}
async (objectId: string, streamId: string): Promise<Optional<ObjectRecord>> => {
return await tables
.objects(deps.db)
.where(Objects.col.id, objectId)
.andWhere(Objects.col.streamId, streamId)
.first()
}
export const getFormattedObjectFactory =
(deps: { db: Knex }): GetFormattedObject =>
async ({ streamId, objectId }) => {
const res = await tables
.objects(deps.db)
.where({ streamId, id: objectId })
.select('*')
.first()
if (!res) return null
async ({ streamId, objectId }) => {
const res = await tables
.objects(deps.db)
.where({ streamId, id: objectId })
.select('*')
.first()
if (!res) return null
// TODO: Why tho? A lot if not most of places already just use getObjectFactory,
const finalRes: SetOptional<typeof res, 'streamId'> = res
if (finalRes.data) finalRes.data.totalChildrenCount = res.totalChildrenCount // move this back
delete finalRes.streamId // backwards compatibility
// TODO: Why tho? A lot if not most of places already just use getObjectFactory,
const finalRes: SetOptional<typeof res, 'streamId'> = res
if (finalRes.data) finalRes.data.totalChildrenCount = res.totalChildrenCount // move this back
delete finalRes.streamId // backwards compatibility
return finalRes
}
return finalRes
}
export const getBatchedStreamObjectsFactory =
(deps: { db: Knex }): GetBatchedStreamObjects =>
(streamId: string, options?: Partial<BatchedSelectOptions>) => {
const baseQuery = tables
.objects(deps.db)
.select<ObjectRecord[]>('*')
.where(Objects.col.streamId, streamId)
.orderBy(Objects.col.id)
(streamId: string, options?: Partial<BatchedSelectOptions>) => {
const baseQuery = tables
.objects(deps.db)
.select<ObjectRecord[]>('*')
.where(Objects.col.streamId, streamId)
.orderBy(Objects.col.id)
return executeBatchedSelect(baseQuery, options)
}
return executeBatchedSelect(baseQuery, options)
}
export const insertObjectsFactory =
(deps: { db: Knex }): StoreObjects =>
async (objects: ObjectRecord[], options?: Partial<{ trx: Knex.Transaction }>) => {
const q = tables.objects(deps.db).insert(objects)
if (options?.trx) q.transacting(options.trx)
return await q
}
async (objects: ObjectRecord[], options?: Partial<{ trx: Knex.Transaction }>) => {
const q = tables.objects(deps.db).insert(objects)
if (options?.trx) q.transacting(options.trx)
return await q
}
export const storeSingleObjectIfNotFoundFactory =
(deps: { db: Knex }): StoreSingleObjectIfNotFound =>
async (insertionObject) => {
await tables
.objects(deps.db)
.insert(
// knex is bothered by string being inserted into jsonb, which is actually fine
insertionObject as SpeckleObject
)
.onConflict()
.ignore()
}
async (insertionObject) => {
await tables
.objects(deps.db)
.insert(
// knex is bothered by string being inserted into jsonb, which is actually fine
insertionObject as SpeckleObject
)
.onConflict()
.ignore()
}
export const storeObjectsIfNotFoundFactory =
(deps: { db: Knex }): StoreObjectsIfNotFound =>
async (batch) => {
await tables
.objects(deps.db)
.insert(
// knex is bothered by string being inserted into jsonb, which is actually fine
batch as SpeckleObject[]
)
.onConflict()
.ignore()
}
async (batch) => {
await tables
.objects(deps.db)
.insert(
// knex is bothered by string being inserted into jsonb, which is actually fine
batch as SpeckleObject[]
)
.onConflict()
.ignore()
}
export const storeClosuresIfNotFoundFactory =
(deps: { db: Knex }): StoreClosuresIfNotFound =>
async (closuresBatch) => {
await tables
.objectChildrenClosure(deps.db)
.insert(closuresBatch)
.onConflict()
.ignore()
}
async (closuresBatch) => {
await tables
.objectChildrenClosure(deps.db)
.insert(closuresBatch)
.onConflict()
.ignore()
}
export const getObjectChildrenStreamFactory =
(deps: { db: Knex }): GetObjectChildrenStream =>
async ({ streamId, objectId }) => {
const q = deps.db.with(
'object_children_closure',
knex.raw(
`SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId"
FROM objects
JOIN jsonb_each_text(objects.data->'__closure') d ON true
where objects.id = ?`,
[streamId, objectId]
)
)
q.select('id')
q.select(knex.raw('data::text as "dataText"'))
q.from('object_children_closure')
q.rightJoin('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where(
knex.raw('object_children_closure."streamId" = ? AND parent = ?', [
streamId,
objectId
])
)
.orderBy('objects.id')
return q.stream({ highWaterMark: 500 })
}
export const getObjectsStreamFactory =
(deps: { db: Knex }): GetObjectsStream =>
async ({ streamId, objectIds }) => {
const res = tables
.objects(deps.db)
.whereIn('id', objectIds)
.andWhere('streamId', streamId)
.orderBy('id')
.select(
knex.raw(
'"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", data::text as "dataText"'
)
)
return res.stream({ highWaterMark: 500 })
}
export const hasObjectsFactory =
(deps: { db: Knex }): HasObjects =>
async ({ streamId, objectIds }) => {
const dbRes = await tables
.objects(deps.db)
.whereIn('id', objectIds)
.andWhere('streamId', streamId)
.select('id')
const res: Record<string, boolean> = {}
// eslint-disable-next-line @typescript-eslint/no-for-in-array
for (const i in objectIds) {
res[objectIds[i]] = false
}
// eslint-disable-next-line @typescript-eslint/no-for-in-array
for (const i in dbRes) {
res[dbRes[i].id] = true
}
return res
}
export const getObjectChildrenFactory =
(deps: { db: Knex }): GetObjectChildren =>
async ({ streamId, objectId, limit, depth, select, cursor }) => {
limit = toNumber(limit || 0) || 50
depth = toNumber(depth || 0) || 1000
let fullObjectSelect = false
const q = deps.db.with(
'object_children_closure',
knex.raw(
`SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId"
FROM objects
JOIN jsonb_each_text(objects.data->'__closure') d ON true
where objects.id = ?`,
[streamId, objectId]
)
)
if (Array.isArray(select)) {
select.forEach((field, index) => {
q.select(
knex.raw('jsonb_path_query(data, :path) as :name:', {
path: '$.' + field,
name: '' + index
})
)
})
} else {
fullObjectSelect = true
q.select('data')
}
q.select('id')
q.select('createdAt')
q.select('speckleType')
q.select('totalChildrenCount')
q.from('object_children_closure')
q.rightJoin('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where(
knex.raw('object_children_closure."streamId" = ? AND parent = ?', [
streamId,
objectId
])
)
.andWhere(knex.raw('object_children_closure.mindepth < ?', [depth]))
.andWhere(knex.raw('id > ?', [cursor ? cursor : '0']))
.orderBy('objects.id')
.limit(limit)
const rows = await q
if (rows.length === 0) {
return { objects: rows, cursor: null }
}
if (!fullObjectSelect)
rows.forEach((o, i, arr) => {
const no = {
id: o.id,
createdAt: o.createdAt,
speckleType: o.speckleType,
totalChildrenCount: o.totalChildrenCount,
data: {}
}
let k = 0
for (const field of select || []) {
set(no.data, field, o[k++])
}
arr[i] = no
})
const lastId = rows[rows.length - 1].id
return { objects: rows, cursor: lastId }
}
/**
* This query is inefficient on larger sets (n * 10k objects) as we need to return the total count on an arbitrarily (user) defined selection of objects.
* A possible future optimisation route would be to cache the total count of a query (as objects are immutable, it will not change) on a first run, and, if found on a subsequent round, do a simpler query and merge the total count result.
*/
export const getObjectChildrenQueryFactory =
(deps: { db: Knex }): GetObjectChildrenQuery =>
async (params) => {
const { streamId, objectId, select, query } = params
const limit = toNumber(params.limit || 0) || 50
const depth = toNumber(params.depth || 0) || 1000
const orderBy = params.orderBy || { field: 'id', direction: 'asc' }
// Cursors received by this service should be base64 encoded. They are generated on first entry query by this service; They should never be client-side generated.
const cursor: Optional<{
value: unknown
operator: string
field: string
lastSeenId?: string
}> = params.cursor
? JSON.parse(Buffer.from(params.cursor, 'base64').toString('binary'))
: undefined
// Flag that keeps track of whether we select the whole "data" part of an object or not
let fullObjectSelect = false
if (Array.isArray(select)) {
// if we order by a field that we do not select, select it!
if (orderBy && select.indexOf(orderBy.field) === -1) {
select.push(orderBy.field)
}
// // always add the id!
// if ( select.indexOf( 'id' ) === -1 ) select.unshift( 'id' )
} else {
fullObjectSelect = true
}
const additionalIdOrderBy = orderBy.field !== 'id'
const operatorsWhitelist = ['=', '>', '>=', '<', '<=', '!=']
const mainQuery = deps.db
.with(
async ({ streamId, objectId }) => {
const q = deps.db.with(
'object_children_closure',
knex.raw(
`SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId"
@@ -341,201 +148,394 @@ export const getObjectChildrenQueryFactory =
[streamId, objectId]
)
)
.with('objs', (cteInnerQuery) => {
// always select the id
cteInnerQuery.select('id').from('object_children_closure')
cteInnerQuery.select('createdAt')
cteInnerQuery.select('speckleType')
cteInnerQuery.select('totalChildrenCount')
q.select('id')
q.select(knex.raw('data::text as "dataText"'))
q.from('object_children_closure')
// if there are any select fields, add them
if (Array.isArray(select)) {
select.forEach((field, index) => {
cteInnerQuery.select(
knex.raw('jsonb_path_query(data, :path) as :name:', {
path: '$.' + field,
name: '' + index
})
)
})
// otherwise, get the whole object, as stored in the jsonb column
} else {
cteInnerQuery.select('data')
q.rightJoin('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where(
knex.raw('object_children_closure."streamId" = ? AND parent = ?', [
streamId,
objectId
])
)
.orderBy('objects.id')
return q.stream({ highWaterMark: 500 })
}
export const getObjectsStreamFactory =
(deps: { db: Knex }): GetObjectsStream =>
async ({ streamId, objectIds }) => {
const res = tables
.objects(deps.db)
.whereIn('id', objectIds)
.andWhere('streamId', streamId)
.orderBy('id')
.select(
knex.raw(
'"id", "speckleType", "totalChildrenCount", "totalChildrenCountByDepth", "createdAt", data::text as "dataText"'
)
)
return res.stream({ highWaterMark: 500 })
}
export const hasObjectsFactory =
(deps: { db: Knex }): HasObjects =>
async ({ streamId, objectIds }) => {
const dbRes = await tables
.objects(deps.db)
.whereIn('id', objectIds)
.andWhere('streamId', streamId)
.select('id')
const res: Record<string, boolean> = {}
// eslint-disable-next-line @typescript-eslint/no-for-in-array
for (const i in objectIds) {
res[objectIds[i]] = false
}
// eslint-disable-next-line @typescript-eslint/no-for-in-array
for (const i in dbRes) {
res[dbRes[i].id] = true
}
return res
}
export const getObjectChildrenFactory =
(deps: { db: Knex }): GetObjectChildren =>
async ({ streamId, objectId, limit, depth, select, cursor }) => {
limit = toNumber(limit || 0) || 50
depth = toNumber(depth || 0) || 1000
let fullObjectSelect = false
const q = deps.db.with(
'object_children_closure',
knex.raw(
`SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId"
FROM objects
JOIN jsonb_each_text(objects.data->'__closure') d ON true
where objects.id = ?`,
[streamId, objectId]
)
)
if (Array.isArray(select)) {
select.forEach((field, index) => {
q.select(
knex.raw('jsonb_path_query(data, :path) as :name:', {
path: '$.' + field,
name: '' + index
})
)
})
} else {
fullObjectSelect = true
q.select('data')
}
q.select('id')
q.select('createdAt')
q.select('speckleType')
q.select('totalChildrenCount')
q.from('object_children_closure')
q.rightJoin('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where(
knex.raw('object_children_closure."streamId" = ? AND parent = ?', [
streamId,
objectId
])
)
.andWhere(knex.raw('object_children_closure.mindepth < ?', [depth]))
.andWhere(knex.raw('id > ?', [cursor ? cursor : '0']))
.orderBy('objects.id')
.limit(limit)
const rows = await q
if (rows.length === 0) {
return { objects: rows, cursor: null }
}
if (!fullObjectSelect)
rows.forEach((o, i, arr) => {
const no = {
id: o.id,
createdAt: o.createdAt,
speckleType: o.speckleType,
totalChildrenCount: o.totalChildrenCount,
data: {}
}
let k = 0
for (const field of select || []) {
set(no.data, field, o[k++])
}
arr[i] = no
})
const lastId = rows[rows.length - 1].id
return { objects: rows, cursor: lastId }
}
/**
* This query is inefficient on larger sets (n * 10k objects) as we need to return the total count on an arbitrarily (user) defined selection of objects.
* A possible future optimisation route would be to cache the total count of a query (as objects are immutable, it will not change) on a first run, and, if found on a subsequent round, do a simpler query and merge the total count result.
*/
export const getObjectChildrenQueryFactory =
(deps: { db: Knex }): GetObjectChildrenQuery =>
async (params) => {
const { streamId, objectId, select, query } = params
const limit = toNumber(params.limit || 0) || 50
const depth = toNumber(params.depth || 0) || 1000
const orderBy = params.orderBy || { field: 'id', direction: 'asc' }
// Cursors received by this service should be base64 encoded. They are generated on first entry query by this service; They should never be client-side generated.
const cursor: Optional<{
value: unknown
operator: string
field: string
lastSeenId?: string
}> = params.cursor
? JSON.parse(Buffer.from(params.cursor, 'base64').toString('binary'))
: undefined
// Flag that keeps track of whether we select the whole "data" part of an object or not
let fullObjectSelect = false
if (Array.isArray(select)) {
// if we order by a field that we do not select, select it!
if (orderBy && select.indexOf(orderBy.field) === -1) {
select.push(orderBy.field)
}
// // always add the id!
// if ( select.indexOf( 'id' ) === -1 ) select.unshift( 'id' )
} else {
fullObjectSelect = true
}
// join on objects table
cteInnerQuery
.join('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where('object_children_closure.streamId', streamId)
.andWhere('parent', objectId)
.andWhere('mindepth', '<', depth)
const additionalIdOrderBy = orderBy.field !== 'id'
// Add user provided filters/queries.
if (Array.isArray(query) && query.length > 0) {
cteInnerQuery.andWhere((nestedWhereQuery) => {
query.forEach((statement, index) => {
let castType = 'text'
if (typeof statement.value === 'string') castType = 'text'
if (typeof statement.value === 'boolean') castType = 'boolean'
if (typeof statement.value === 'number') castType = 'numeric'
const operatorsWhitelist = ['=', '>', '>=', '<', '<=', '!=']
if (operatorsWhitelist.indexOf(statement.operator) === -1)
throw new Error('Invalid operator for query')
const mainQuery = deps.db
.with(
'object_children_closure',
knex.raw(
`SELECT objects.id as parent, d.key as child, d.value as mindepth, ? as "streamId"
FROM objects
JOIN jsonb_each_text(objects.data->'__closure') d ON true
where objects.id = ?`,
[streamId, objectId]
)
)
.with('objs', (cteInnerQuery) => {
// always select the id
cteInnerQuery.select('id').from('object_children_closure')
cteInnerQuery.select('createdAt')
cteInnerQuery.select('speckleType')
cteInnerQuery.select('totalChildrenCount')
// Determine the correct where clause (where, and where, or where)
let whereClause: keyof typeof nestedWhereQuery
if (index === 0) whereClause = 'where'
else if (statement.verb && statement.verb.toLowerCase() === 'or')
whereClause = 'orWhere'
else whereClause = 'andWhere'
// Note: castType is generated from the statement's value and operators are matched against a whitelist.
// If comparing with strings, the jsonb_path_query(_first) func returns json encoded strings (ie, `bar` is actually `"bar"`), hence we need to add the quotes manually to the raw provided comparison value.
nestedWhereQuery[whereClause](
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${statement.operator} ? `,
[
'$.' + statement.field,
castType === 'text' ? `"${statement.value}"` : statement.value
]
)
// if there are any select fields, add them
if (Array.isArray(select)) {
select.forEach((field, index) => {
cteInnerQuery.select(
knex.raw('jsonb_path_query(data, :path) as :name:', {
path: '$.' + field,
name: '' + index
})
)
})
})
}
// otherwise, get the whole object, as stored in the jsonb column
} else {
cteInnerQuery.select('data')
}
// Order by clause; validate direction!
const direction =
orderBy.direction && orderBy.direction.toLowerCase() === 'desc'
? 'desc'
: 'asc'
if (orderBy.field === 'id') {
cteInnerQuery.orderBy('id', direction)
// join on objects table
cteInnerQuery
.join('objects', function () {
this.on('objects.streamId', '=', 'object_children_closure.streamId').andOn(
'objects.id',
'=',
'object_children_closure.child'
)
})
.where('object_children_closure.streamId', streamId)
.andWhere('parent', objectId)
.andWhere('mindepth', '<', depth)
// Add user provided filters/queries.
if (Array.isArray(query) && query.length > 0) {
cteInnerQuery.andWhere((nestedWhereQuery) => {
query.forEach((statement, index) => {
let castType = 'text'
if (typeof statement.value === 'string') castType = 'text'
if (typeof statement.value === 'boolean') castType = 'boolean'
if (typeof statement.value === 'number') castType = 'numeric'
if (operatorsWhitelist.indexOf(statement.operator) === -1)
throw new Error('Invalid operator for query')
// Determine the correct where clause (where, and where, or where)
let whereClause: keyof typeof nestedWhereQuery
if (index === 0) whereClause = 'where'
else if (statement.verb && statement.verb.toLowerCase() === 'or')
whereClause = 'orWhere'
else whereClause = 'andWhere'
// Note: castType is generated from the statement's value and operators are matched against a whitelist.
// If comparing with strings, the jsonb_path_query(_first) func returns json encoded strings (ie, `bar` is actually `"bar"`), hence we need to add the quotes manually to the raw provided comparison value.
nestedWhereQuery[whereClause](
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${statement.operator} ? `,
[
'$.' + statement.field,
castType === 'text' ? `"${statement.value}"` : statement.value
]
)
)
})
})
}
// Order by clause; validate direction!
const direction =
orderBy.direction && orderBy.direction.toLowerCase() === 'desc'
? 'desc'
: 'asc'
if (orderBy.field === 'id') {
cteInnerQuery.orderBy('id', direction)
} else {
cteInnerQuery.orderByRaw(
knex.raw(`jsonb_path_query_first( data, ? ) ${direction}, id asc`, [
'$.' + orderBy.field
])
)
}
})
.select('*')
.from('objs')
.joinRaw('RIGHT JOIN ( SELECT count(*) FROM "objs" ) c(total_count) ON TRUE')
// Set cursor clause, if present. If it's not present, it's an entry query; this method will return a cursor based on its given query.
// We have implemented keyset pagination for more efficient searches on larger sets. This approach depends on an order by value provided by the user and a (hidden) primary key.
// logger.debug( cursor )
if (cursor) {
let castType = 'text'
if (typeof cursor.value === 'string') castType = 'text'
if (typeof cursor.value === 'boolean') castType = 'boolean'
if (typeof cursor.value === 'number') castType = 'numeric'
// When strings are used inside an order clause, as mentioned above, we need to add quotes around the comparison value, as the jsonb_path_query funcs return json encoded strings (`{"test":"foo"}` => test is returned as `"foo"`)
if (castType === 'text') cursor.value = `"${cursor.value}"`
if (operatorsWhitelist.indexOf(cursor.operator) === -1)
throw new Error('Invalid operator for cursor')
// Unwrapping the tuple comparison of ( userOrderByField, id ) > ( lastValueOfUserOrderBy, lastSeenId )
if (fullObjectSelect) {
if (cursor.field === 'id') {
mainQuery.where(knex.raw(`id ${cursor.operator} ? `, [cursor.value]))
} else {
mainQuery.where(
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${cursor.operator}= ? `,
['$.' + cursor.field, cursor.value]
)
)
}
} else {
cteInnerQuery.orderByRaw(
knex.raw(`jsonb_path_query_first( data, ? ) ${direction}, id asc`, [
'$.' + orderBy.field
mainQuery.where(
knex.raw(`??::${castType} ${cursor.operator}= ? `, [
(select || []).indexOf(cursor.field).toString(),
cursor.value
])
)
}
})
.select('*')
.from('objs')
.joinRaw('RIGHT JOIN ( SELECT count(*) FROM "objs" ) c(total_count) ON TRUE')
// Set cursor clause, if present. If it's not present, it's an entry query; this method will return a cursor based on its given query.
// We have implemented keyset pagination for more efficient searches on larger sets. This approach depends on an order by value provided by the user and a (hidden) primary key.
// logger.debug( cursor )
if (cursor) {
let castType = 'text'
if (typeof cursor.value === 'string') castType = 'text'
if (typeof cursor.value === 'boolean') castType = 'boolean'
if (typeof cursor.value === 'number') castType = 'numeric'
// When strings are used inside an order clause, as mentioned above, we need to add quotes around the comparison value, as the jsonb_path_query funcs return json encoded strings (`{"test":"foo"}` => test is returned as `"foo"`)
if (castType === 'text') cursor.value = `"${cursor.value}"`
if (operatorsWhitelist.indexOf(cursor.operator) === -1)
throw new Error('Invalid operator for cursor')
// Unwrapping the tuple comparison of ( userOrderByField, id ) > ( lastValueOfUserOrderBy, lastSeenId )
if (fullObjectSelect) {
if (cursor.field === 'id') {
mainQuery.where(knex.raw(`id ${cursor.operator} ? `, [cursor.value]))
} else {
mainQuery.where(
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${cursor.operator}= ? `,
['$.' + cursor.field, cursor.value]
)
)
if (cursor.lastSeenId) {
mainQuery.andWhere((qb) => {
// Dunno what the TS issue here is, the JS code itself seemed to work fine as JS
// eslint-disable-next-line @typescript-eslint/no-explicit-any
qb.where('id' as any, '>', cursor!.lastSeenId as any)
if (fullObjectSelect)
qb.orWhere(
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${cursor!.operator} ? `,
['$.' + cursor!.field, cursor!.value]
)
)
else
qb.orWhere(
knex.raw(`??::${castType} ${cursor!.operator} ? `, [
(select || []).indexOf(cursor!.field).toString(),
cursor!.value
])
)
})
}
} else {
mainQuery.where(
knex.raw(`??::${castType} ${cursor.operator}= ? `, [
(select || []).indexOf(cursor.field).toString(),
cursor.value
])
)
}
if (cursor.lastSeenId) {
mainQuery.andWhere((qb) => {
// Dunno what the TS issue here is, the JS code itself seemed to work fine as JS
// eslint-disable-next-line @typescript-eslint/no-explicit-any
qb.where('id' as any, '>', cursor!.lastSeenId as any)
if (fullObjectSelect)
qb.orWhere(
knex.raw(
`jsonb_path_query_first( data, ? )::${castType} ${cursor!.operator} ? `,
['$.' + cursor!.field, cursor!.value]
)
)
else
qb.orWhere(
knex.raw(`??::${castType} ${cursor!.operator} ? `, [
(select || []).indexOf(cursor!.field).toString(),
cursor!.value
])
)
mainQuery.limit(limit)
// logger.debug( mainQuery.toString() )
// Finally, execute the query
const rows = await mainQuery
const totalCount = rows && rows.length > 0 ? parseInt(rows[0].total_count) : 0
// Return early
if (totalCount === 0) return { totalCount, objects: [], cursor: null }
// Reconstruct the object based on the provided select paths.
if (!fullObjectSelect) {
rows.forEach((o, i, arr) => {
const no = {
id: o.id,
createdAt: o.createdAt,
speckleType: o.speckleType,
totalChildrenCount: o.totalChildrenCount,
data: {}
}
let k = 0
for (const field of select || []) {
set(no.data, field, o[k++])
}
arr[i] = no
})
}
// Assemble the cursor for an eventual next call
const cursorObj: typeof cursor = {
field: cursor?.field || orderBy.field,
operator:
cursor?.operator ||
(orderBy.direction && orderBy.direction.toLowerCase() === 'desc' ? '<' : '>'),
value: get(rows[rows.length - 1], `data.${orderBy.field}`)
}
// If we're not ordering by id (default case, where no order by argument is provided), we need to add the last seen id of this query in order to enable keyset pagination.
if (additionalIdOrderBy) {
cursorObj.lastSeenId = rows[rows.length - 1].id
}
// Cursor objects should be client-side opaque, hence we encode them to base64.
const cursorEncoded = Buffer.from(JSON.stringify(cursorObj), 'binary').toString(
'base64'
)
return {
totalCount,
objects: rows,
cursor: rows.length === limit ? cursorEncoded : null
}
}
mainQuery.limit(limit)
// logger.debug( mainQuery.toString() )
// Finally, execute the query
const rows = await mainQuery
const totalCount = rows && rows.length > 0 ? parseInt(rows[0].total_count) : 0
// Return early
if (totalCount === 0) return { totalCount, objects: [], cursor: null }
// Reconstruct the object based on the provided select paths.
if (!fullObjectSelect) {
rows.forEach((o, i, arr) => {
const no = {
id: o.id,
createdAt: o.createdAt,
speckleType: o.speckleType,
totalChildrenCount: o.totalChildrenCount,
data: {}
}
let k = 0
for (const field of select || []) {
set(no.data, field, o[k++])
}
arr[i] = no
})
}
// Assemble the cursor for an eventual next call
const cursorObj: typeof cursor = {
field: cursor?.field || orderBy.field,
operator:
cursor?.operator ||
(orderBy.direction && orderBy.direction.toLowerCase() === 'desc' ? '<' : '>'),
value: get(rows[rows.length - 1], `data.${orderBy.field}`)
}
// If we're not ordering by id (default case, where no order by argument is provided), we need to add the last seen id of this query in order to enable keyset pagination.
if (additionalIdOrderBy) {
cursorObj.lastSeenId = rows[rows.length - 1].id
}
// Cursor objects should be client-side opaque, hence we encode them to base64.
const cursorEncoded = Buffer.from(JSON.stringify(cursorObj), 'binary').toString(
'base64'
)
return {
totalCount,
objects: rows,
cursor: rows.length === limit ? cursorEncoded : null
}
}
@@ -22,7 +22,7 @@ export async function* executeBatchedSelect<
>(
selectQuery: Knex.QueryBuilder<TRecord, TResult>,
options?: Partial<BatchedSelectOptions>
): AsyncGenerator<TResult, void, unknown> {
): AsyncGenerator<Awaited<(typeof selectQuery)>, void, unknown> {
const { batchSize = 100, trx } = options || {}
if (trx) selectQuery.transacting(trx)
@@ -33,7 +33,7 @@ export async function* executeBatchedSelect<
let currentOffset = 0
while (hasMorePages) {
const q = selectQuery.clone().offset(currentOffset)
const results = (await q) as TResult
const results = (await q) as Awaited<(typeof selectQuery)>
if (!results.length) {
hasMorePages = false
@@ -308,3 +308,11 @@ export type GetWorkspaceCreationState = (params: {
export type UpsertWorkspaceCreationState = (params: {
workspaceCreationState: WorkspaceCreationState
}) => Promise<void>
/**
* Project regions
*/
export type CopyProjects = (params: { projectIds: string[] }) => Promise<string[]>
export type CopyProjectModels = (params: { projectIds: string[] }) => Promise<Record<string, string[]>>
export type CopyProjectVersions = (params: { projectIds: string[] }) => Promise<Record<string, string[]>>
@@ -1,7 +1,15 @@
import { buildTableHelper } from '@/modules/core/dbSchema'
import { BranchCommits, Branches, buildTableHelper, Commits, StreamCommits, StreamFavorites, Streams, StreamsMeta } from '@/modules/core/dbSchema'
import { Branch } from '@/modules/core/domain/branches/types'
import { Commit } from '@/modules/core/domain/commits/types'
import { Stream } from '@/modules/core/domain/streams/types'
import { BranchCommitRecord, StreamCommitRecord, StreamFavoriteRecord } from '@/modules/core/helpers/types'
import { RegionRecord } from '@/modules/multiregion/helpers/types'
import { Regions } from '@/modules/multiregion/repositories'
import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper'
import {
CopyProjectModels,
CopyProjects,
CopyProjectVersions,
GetDefaultRegion,
UpsertRegionAssignment
} from '@/modules/workspaces/domain/operations'
@@ -15,32 +23,156 @@ export const WorkspaceRegions = buildTableHelper('workspace_regions', [
const tables = {
workspaceRegions: (db: Knex) => db<WorkspaceRegionAssignment>(WorkspaceRegions.name),
regions: (db: Knex) => db<RegionRecord>(Regions.name)
regions: (db: Knex) => db<RegionRecord>(Regions.name),
projects: (db: Knex) => db<Stream>(Streams.name),
models: (db: Knex) => db<Branch>(Branches.name),
versions: (db: Knex) => db<Commit>(Commits.name),
branchCommits: (db: Knex) => db<BranchCommitRecord>(BranchCommits.name),
streamCommits: (db: Knex) => db<StreamCommitRecord>(StreamCommits.name),
streamFavorites: (db: Knex) => db<StreamFavoriteRecord>(StreamFavorites.name),
streamsMeta: (db: Knex) => db(StreamsMeta.name)
}
export const upsertRegionAssignmentFactory =
(deps: { db: Knex }): UpsertRegionAssignment =>
async (params) => {
const { workspaceId, regionKey } = params
const [row] = await tables
.workspaceRegions(deps.db)
.insert({ workspaceId, regionKey }, '*')
.onConflict(['workspaceId', 'regionKey'])
.merge()
async (params) => {
const { workspaceId, regionKey } = params
const [row] = await tables
.workspaceRegions(deps.db)
.insert({ workspaceId, regionKey }, '*')
.onConflict(['workspaceId', 'regionKey'])
.merge()
return row
}
return row
}
export const getDefaultRegionFactory =
(deps: { db: Knex }): GetDefaultRegion =>
async (params) => {
const { workspaceId } = params
const row = await tables
.regions(deps.db)
.select<RegionRecord>(Regions.cols)
.join(WorkspaceRegions.name, WorkspaceRegions.col.regionKey, Regions.col.key)
.where({ [WorkspaceRegions.col.workspaceId]: workspaceId })
.first()
async (params) => {
const { workspaceId } = params
const row = await tables
.regions(deps.db)
.select<RegionRecord>(Regions.cols)
.join(WorkspaceRegions.name, WorkspaceRegions.col.regionKey, Regions.col.key)
.where({ [WorkspaceRegions.col.workspaceId]: workspaceId })
.first()
return row
}
export const copyProjects =
(deps: { sourceDb: Knex, targetDb: Knex }): CopyProjects =>
async ({ projectIds }) => {
const selectProjects = tables.projects(deps.sourceDb).select('*').whereIn(Streams.col.id, projectIds)
const copiedProjectIds: string[] = []
// Copy project record
for await (const projects of executeBatchedSelect(selectProjects)) {
for (const project of projects) {
// Store copied project id
copiedProjectIds.push(project.id)
// Copy `streams` row to target db
await tables.projects(deps.targetDb)
.insert(project)
.onConflict()
.ignore()
}
const projectIds = projects.map((project) => project.id)
// Fetch `stream_favorites` rows for projects in batch
const selectStreamFavorites = tables.streamFavorites(deps.sourceDb).select('*').whereIn(StreamFavorites.col.streamId, projectIds)
for await (const streamFavorites of executeBatchedSelect(selectStreamFavorites)) {
for (const streamFavorite of streamFavorites) {
// Copy `stream_favorites` row to target db
await tables.streamFavorites(deps.targetDb).insert(streamFavorite).onConflict().ignore()
}
}
// Fetch `streams_meta` rows for projects in batch
const selectStreamsMetadata = tables.streamsMeta(deps.sourceDb).select('*').whereIn(StreamsMeta.col.streamId, projectIds)
for await (const streamsMetadataBatch of executeBatchedSelect(selectStreamsMetadata)) {
for (const streamMetadata of streamsMetadataBatch) {
// Copy `streams_meta` row to target db
await tables.streamsMeta(deps.targetDb).insert(streamMetadata).onConflict().ignore()
}
}
}
return copiedProjectIds
}
export const copyProjectModels =
(deps: { sourceDb: Knex, targetDb: Knex }): CopyProjectModels =>
async ({ projectIds }) => {
const copiedModelIds: Record<string, string[]> = projectIds.reduce((result, id) => ({ ...result, [id]: [] }), {})
for (const projectId of projectIds) {
const selectModels = tables.models(deps.sourceDb).select('*').where({ streamId: projectId })
for await (const models of executeBatchedSelect(selectModels)) {
for (const model of models) {
// Store copied model ids
copiedModelIds[projectId].push(model.id)
// Copy `branches` row to target db
await tables.models(deps.targetDb).insert(model).onConflict().ignore()
}
}
}
return copiedModelIds
}
export const copyProjectVersions =
(deps: { sourceDb: Knex, targetDb: Knex }): CopyProjectVersions =>
async ({ projectIds }) => {
const copiedVersionIds: Record<string, string[]> = projectIds.reduce((result, id) => ({ ...result, [id]: [] }), {})
for (const projectId of projectIds) {
const selectVersions = tables.streamCommits(deps.sourceDb).select('*')
.join<StreamCommitRecord & Commit>(Commits.name, Commits.col.id, StreamCommits.col.commitId)
.where({ streamId: projectId })
for await (const versions of executeBatchedSelect(selectVersions)) {
for (const version of versions) {
const { commitId, ...commit } = version
// Store copied version id
copiedVersionIds[projectId].push(commitId)
// Copy `commits` row to target db
await tables.versions(deps.targetDb).insert(commit).onConflict().ignore()
}
const commitIds = versions.map((version) => version.commitId)
// Fetch `branch_commits` rows for versions in batch
const selectBranchCommits = tables.branchCommits(deps.sourceDb).select('*').whereIn(BranchCommits.col.commitId, commitIds)
for await (const branchCommits of executeBatchedSelect(selectBranchCommits)) {
for (const branchCommit of branchCommits) {
// Copy `branch_commits` row to target db
await tables.branchCommits(deps.targetDb).insert(branchCommit).onConflict().ignore()
}
}
// Fetch `stream_commits` rows for versions in batch
const selectStreamCommits = tables.streamCommits(deps.sourceDb).select('*').whereIn(StreamCommits.col.commitId, commitIds)
for await (const streamCommits of executeBatchedSelect(selectStreamCommits)) {
for (const streamCommit of streamCommits) {
// Copy `stream_commits` row to target db
await tables.streamCommits(deps.targetDb).insert(streamCommit).onConflict().ignore()
}
}
}
}
return copiedVersionIds
}
return row
}
@@ -1,5 +1,8 @@
import { StreamRecord } from '@/modules/core/helpers/types'
import {
CopyProjectModels,
CopyProjects,
CopyProjectVersions,
GetDefaultRegion,
GetWorkspace,
GetWorkspaceRoleForUser,
@@ -97,23 +100,23 @@ type GetWorkspaceProjectsReturnValue = {
export const getWorkspaceProjectsFactory =
({ getStreams }: { getStreams: GetUserStreamsPage }) =>
async (
args: GetWorkspaceProjectsArgs,
opts: GetWorkspaceProjectsOptions
): Promise<GetWorkspaceProjectsReturnValue> => {
const { streams, cursor } = await getStreams({
cursor: opts.cursor,
limit: opts.limit || 25,
searchQuery: opts.filter?.search || undefined,
workspaceId: args.workspaceId,
userId: opts.filter.userId
})
async (
args: GetWorkspaceProjectsArgs,
opts: GetWorkspaceProjectsOptions
): Promise<GetWorkspaceProjectsReturnValue> => {
const { streams, cursor } = await getStreams({
cursor: opts.cursor,
limit: opts.limit || 25,
searchQuery: opts.filter?.search || undefined,
workspaceId: args.workspaceId,
userId: opts.filter.userId
})
return {
items: streams,
cursor
return {
items: streams,
cursor
}
}
}
type MoveProjectToWorkspaceArgs = {
projectId: string
@@ -138,66 +141,78 @@ export const moveProjectToWorkspaceFactory =
getWorkspaceRoleToDefaultProjectRoleMapping: GetWorkspaceRoleToDefaultProjectRoleMapping
updateWorkspaceRole: UpdateWorkspaceRole
}) =>
async ({
projectId,
workspaceId
}: MoveProjectToWorkspaceArgs): Promise<StreamRecord> => {
const project = await getProject({ projectId })
async ({
projectId,
workspaceId
}: MoveProjectToWorkspaceArgs): Promise<StreamRecord> => {
const project = await getProject({ projectId })
if (!project) throw new ProjectNotFoundError()
if (project.workspaceId?.length) {
// We do not currently support moving projects between workspaces
throw new WorkspaceInvalidProjectError(
'Specified project already belongs to a workspace. Moving between workspaces is not yet supported.'
)
}
// Update roles for current project members
const projectTeam = await getProjectCollaborators({ projectId })
const workspaceTeam = await getWorkspaceRoles({ workspaceId })
const defaultProjectRoleMapping = await getWorkspaceRoleToDefaultProjectRoleMapping(
{ workspaceId }
)
for (const projectMembers of chunk(projectTeam, 5)) {
await Promise.all(
projectMembers.map(
async ({ id: userId, role: serverRole, streamRole: currentProjectRole }) => {
// Update workspace role. Prefer existing workspace role if there is one.
const currentWorkspaceRole = workspaceTeam.find(
(role) => role.userId === userId
)
const nextWorkspaceRole = currentWorkspaceRole ?? {
userId,
workspaceId,
role:
serverRole === Roles.Server.Guest
? Roles.Workspace.Guest
: Roles.Workspace.Member,
createdAt: new Date()
}
await updateWorkspaceRole(nextWorkspaceRole)
// Update project role. Prefer default workspace project role if more permissive.
const defaultProjectRole =
defaultProjectRoleMapping[nextWorkspaceRole.role] ?? Roles.Stream.Reviewer
const nextProjectRole = orderByWeight(
[currentProjectRole, defaultProjectRole],
coreUserRoles
)[0]
await upsertProjectRole({
userId,
projectId,
role: nextProjectRole.name as StreamRoles
})
}
if (!project) throw new ProjectNotFoundError()
if (project.workspaceId?.length) {
// We do not currently support moving projects between workspaces
throw new WorkspaceInvalidProjectError(
'Specified project already belongs to a workspace. Moving between workspaces is not yet supported.'
)
}
// Update roles for current project members
const projectTeam = await getProjectCollaborators({ projectId })
const workspaceTeam = await getWorkspaceRoles({ workspaceId })
const defaultProjectRoleMapping = await getWorkspaceRoleToDefaultProjectRoleMapping(
{ workspaceId }
)
for (const projectMembers of chunk(projectTeam, 5)) {
await Promise.all(
projectMembers.map(
async ({ id: userId, role: serverRole, streamRole: currentProjectRole }) => {
// Update workspace role. Prefer existing workspace role if there is one.
const currentWorkspaceRole = workspaceTeam.find(
(role) => role.userId === userId
)
const nextWorkspaceRole = currentWorkspaceRole ?? {
userId,
workspaceId,
role:
serverRole === Roles.Server.Guest
? Roles.Workspace.Guest
: Roles.Workspace.Member,
createdAt: new Date()
}
await updateWorkspaceRole(nextWorkspaceRole)
// Update project role. Prefer default workspace project role if more permissive.
const defaultProjectRole =
defaultProjectRoleMapping[nextWorkspaceRole.role] ?? Roles.Stream.Reviewer
const nextProjectRole = orderByWeight(
[currentProjectRole, defaultProjectRole],
coreUserRoles
)[0]
await upsertProjectRole({
userId,
projectId,
role: nextProjectRole.name as StreamRoles
})
}
)
)
}
// Assign project to workspace
return await updateProject({ projectUpdate: { id: projectId, workspaceId } })
}
// Assign project to workspace
return await updateProject({ projectUpdate: { id: projectId, workspaceId } })
}
export const moveProjectToRegionFactory =
(deps: {
copyProjects: CopyProjects,
copyProjectModels: CopyProjectModels,
copyProjectVersions: CopyProjectVersions
}) =>
async (args: { projectId: string }): Promise<void> => {
const projectIds = await deps.copyProjects({ projectIds: [args.projectId] })
const modelIdsByProjectId = await deps.copyProjectModels({ projectIds })
const versionIdsByProjectId = await deps.copyProjectVersions({ projectIds })
}
export const getWorkspaceRoleToDefaultProjectRoleMappingFactory =
({
@@ -205,19 +220,19 @@ export const getWorkspaceRoleToDefaultProjectRoleMappingFactory =
}: {
getWorkspace: GetWorkspace
}): GetWorkspaceRoleToDefaultProjectRoleMapping =>
async ({ workspaceId }) => {
const workspace = await getWorkspace({ workspaceId })
async ({ workspaceId }) => {
const workspace = await getWorkspace({ workspaceId })
if (!workspace) {
throw new WorkspaceNotFoundError()
}
if (!workspace) {
throw new WorkspaceNotFoundError()
}
return {
[Roles.Workspace.Guest]: null,
[Roles.Workspace.Member]: workspace.defaultProjectRole,
[Roles.Workspace.Admin]: Roles.Stream.Owner
return {
[Roles.Workspace.Guest]: null,
[Roles.Workspace.Member]: workspace.defaultProjectRole,
[Roles.Workspace.Admin]: Roles.Stream.Owner
}
}
}
export const updateWorkspaceProjectRoleFactory =
({
@@ -229,63 +244,63 @@ export const updateWorkspaceProjectRoleFactory =
getWorkspaceRoleForUser: GetWorkspaceRoleForUser
updateStreamRoleAndNotify: UpdateStreamRole
}): UpdateWorkspaceProjectRole =>
async ({ role, updater }) => {
const { workspaceId } = (await getStream({ streamId: role.projectId })) ?? {}
if (!workspaceId) throw new WorkspaceInvalidProjectError()
async ({ role, updater }) => {
const { workspaceId } = (await getStream({ streamId: role.projectId })) ?? {}
if (!workspaceId) throw new WorkspaceInvalidProjectError()
const currentWorkspaceRole = await getWorkspaceRoleForUser({
workspaceId,
userId: role.userId
})
const currentWorkspaceRole = await getWorkspaceRoleForUser({
workspaceId,
userId: role.userId
})
if (currentWorkspaceRole?.role === Roles.Workspace.Admin) {
// User is workspace admin and cannot have their project roles changed
throw new WorkspaceAdminError()
if (currentWorkspaceRole?.role === Roles.Workspace.Admin) {
// User is workspace admin and cannot have their project roles changed
throw new WorkspaceAdminError()
}
if (
currentWorkspaceRole?.role === Roles.Workspace.Guest &&
role.role === Roles.Stream.Owner
) {
// Workspace guests cannot be project owners
throw new WorkspaceInvalidRoleError('Workspace guests cannot be project owners.')
}
return await updateStreamRoleAndNotify(
role,
updater.userId!,
updater.resourceAccessRules
)
}
if (
currentWorkspaceRole?.role === Roles.Workspace.Guest &&
role.role === Roles.Stream.Owner
) {
// Workspace guests cannot be project owners
throw new WorkspaceInvalidRoleError('Workspace guests cannot be project owners.')
}
return await updateStreamRoleAndNotify(
role,
updater.userId!,
updater.resourceAccessRules
)
}
export const createWorkspaceProjectFactory =
(deps: { getDefaultRegion: GetDefaultRegion }) =>
async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => {
const { input, ownerId } = params
const workspaceDefaultRegion = await deps.getDefaultRegion({
workspaceId: input.workspaceId
})
const regionKey = workspaceDefaultRegion?.key
const projectDb = await getDb({ regionKey })
const db = mainDb
async (params: { input: WorkspaceProjectCreateInput; ownerId: string }) => {
const { input, ownerId } = params
const workspaceDefaultRegion = await deps.getDefaultRegion({
workspaceId: input.workspaceId
})
const regionKey = workspaceDefaultRegion?.key
const projectDb = await getDb({ regionKey })
const db = mainDb
// todo, use the command factory here, but for that, we need to migrate to the event bus
// deps not injected to ensure proper DB injection
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
projectsEventsEmitter: ProjectsEmitter.emit
})
// todo, use the command factory here, but for that, we need to migrate to the event bus
// deps not injected to ensure proper DB injection
const createNewProject = createNewProjectFactory({
storeProject: storeProjectFactory({ db: projectDb }),
getProject: getProjectFactory({ db }),
deleteProject: deleteProjectFactory({ db: projectDb }),
storeModel: storeModelFactory({ db: projectDb }),
// THIS MUST GO TO THE MAIN DB
storeProjectRole: storeProjectRoleFactory({ db }),
projectsEventsEmitter: ProjectsEmitter.emit
})
const project = await createNewProject({
...input,
regionKey,
ownerId
})
const project = await createNewProject({
...input,
regionKey,
ownerId
})
return project
}
return project
}