fix(acc): query non-unique sync item urns
This commit is contained in:
@@ -1056,7 +1056,7 @@ export type CurrencyBasedPrices = {
|
||||
|
||||
export type DeleteAccSyncItemInput = {
|
||||
id: Scalars['ID']['input'];
|
||||
projectId: Scalars['ID']['input'];
|
||||
projectId: Scalars['String']['input'];
|
||||
};
|
||||
|
||||
export type DeleteModelInput = {
|
||||
@@ -4119,7 +4119,7 @@ export type TriggeredAutomationsStatus = {
|
||||
|
||||
export type UpdateAccSyncItemInput = {
|
||||
id: Scalars['ID']['input'];
|
||||
projectId: Scalars['ID']['input'];
|
||||
projectId: Scalars['String']['input'];
|
||||
status: AccSyncItemStatus;
|
||||
};
|
||||
|
||||
|
||||
@@ -39,13 +39,13 @@ enum AccSyncItemStatus {
|
||||
}
|
||||
|
||||
input DeleteAccSyncItemInput {
|
||||
projectId: ID!
|
||||
id: ID!
|
||||
projectId: String!
|
||||
}
|
||||
|
||||
input UpdateAccSyncItemInput {
|
||||
projectId: ID!
|
||||
id: ID!
|
||||
projectId: String!
|
||||
status: AccSyncItemStatus!
|
||||
}
|
||||
|
||||
|
||||
@@ -8,10 +8,6 @@ export type UpdateAccSyncItemStatus = (args: {
|
||||
status: AccSyncItemStatus
|
||||
}) => Promise<AccSyncItem | null>
|
||||
|
||||
export type GetAccSyncItemByUrn = (args: {
|
||||
lineageUrn: string
|
||||
}) => Promise<AccSyncItem | null>
|
||||
|
||||
export type GetAccSyncItemById = (args: { id: string }) => Promise<AccSyncItem | null>
|
||||
|
||||
export type ListAccSyncItems = (args: {
|
||||
@@ -24,8 +20,12 @@ export type ListAccSyncItems = (args: {
|
||||
|
||||
export type CountAccSyncItems = (args: { projectId: string }) => Promise<number>
|
||||
|
||||
export type DeleteAccSyncItemByUrn = (args: { lineageUrn: string }) => Promise<number>
|
||||
|
||||
export type DeleteAccSyncItemById = (args: { id: string }) => Promise<number>
|
||||
|
||||
export type QueryAllAccSyncItems = () => AsyncGenerator<AccSyncItem[], void, unknown>
|
||||
export type QueryAllAccSyncItems = (args: {
|
||||
batchSize?: number
|
||||
filter?: {
|
||||
status?: AccSyncItemStatus
|
||||
lineageUrn?: string
|
||||
}
|
||||
}) => AsyncGenerator<AccSyncItem[], void, unknown>
|
||||
|
||||
@@ -2,9 +2,7 @@ import { AccSyncItems } from '@/modules/acc/dbSchema'
|
||||
import type {
|
||||
CountAccSyncItems,
|
||||
DeleteAccSyncItemById,
|
||||
DeleteAccSyncItemByUrn,
|
||||
GetAccSyncItemById,
|
||||
GetAccSyncItemByUrn,
|
||||
ListAccSyncItems,
|
||||
QueryAllAccSyncItems,
|
||||
UpdateAccSyncItemStatus,
|
||||
@@ -14,7 +12,6 @@ import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper'
|
||||
import type { AccSyncItem } from '@/modules/acc/domain/types'
|
||||
import type { Knex } from 'knex'
|
||||
import { without } from 'lodash-es'
|
||||
import { AccSyncItemStatuses } from '@/modules/acc/domain/constants'
|
||||
|
||||
const tables = {
|
||||
accSyncItems: (db: Knex) => db<AccSyncItem>(AccSyncItems.name)
|
||||
@@ -32,18 +29,6 @@ export const getAccSyncItemByIdFactory =
|
||||
)
|
||||
}
|
||||
|
||||
export const getAccSyncItemByUrnFactory =
|
||||
(deps: { db: Knex }): GetAccSyncItemByUrn =>
|
||||
async ({ lineageUrn }) => {
|
||||
return (
|
||||
(await tables
|
||||
.accSyncItems(deps.db)
|
||||
.select('*')
|
||||
.where(AccSyncItems.col.accFileLineageUrn, lineageUrn)
|
||||
.first()) ?? null
|
||||
)
|
||||
}
|
||||
|
||||
export const upsertAccSyncItemFactory =
|
||||
(deps: { db: Knex }): UpsertAccSyncItem =>
|
||||
async (item) => {
|
||||
@@ -75,15 +60,6 @@ export const updateAccSyncItemStatusFactory =
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteAccSyncItemByUrnFactory =
|
||||
(deps: { db: Knex }): DeleteAccSyncItemByUrn =>
|
||||
async ({ lineageUrn }) => {
|
||||
return await tables
|
||||
.accSyncItems(deps.db)
|
||||
.where(AccSyncItems.col.accFileLineageUrn, lineageUrn)
|
||||
.delete()
|
||||
}
|
||||
|
||||
export const deleteAccSyncItemByIdFactory =
|
||||
(deps: { db: Knex }): DeleteAccSyncItemById =>
|
||||
async ({ id }) => {
|
||||
@@ -121,13 +97,23 @@ export const countAccSyncItemsFactory =
|
||||
return Number.parseInt(count as string)
|
||||
}
|
||||
|
||||
export const queryAllPendingAccSyncItemsFactory =
|
||||
export const queryAllAccSyncItemsFactory =
|
||||
(deps: { db: Knex }): QueryAllAccSyncItems =>
|
||||
() => {
|
||||
const selectItems = tables
|
||||
({ batchSize = 10, filter = {} }) => {
|
||||
const { status, lineageUrn } = filter
|
||||
|
||||
const query = tables
|
||||
.accSyncItems(deps.db)
|
||||
.select<AccSyncItem[]>('*')
|
||||
.where(AccSyncItems.col.status, AccSyncItemStatuses.pending)
|
||||
.orderBy(AccSyncItems.col.createdAt)
|
||||
return executeBatchedSelect(selectItems, { batchSize: 10 })
|
||||
|
||||
if (filter.status) {
|
||||
query.where(AccSyncItems.withoutTablePrefix.col.status, status)
|
||||
}
|
||||
|
||||
if (filter.lineageUrn) {
|
||||
query.where(AccSyncItems.withoutTablePrefix.col.accFileLineageUrn, lineageUrn)
|
||||
}
|
||||
|
||||
return executeBatchedSelect(query, { batchSize })
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import {
|
||||
getAccSyncItemByUrnFactory,
|
||||
queryAllAccSyncItemsFactory,
|
||||
upsertAccSyncItemFactory
|
||||
} from '@/modules/acc/repositories/accSyncItems'
|
||||
import { onVersionAddedFactory } from '@/modules/acc/services/webhooks'
|
||||
@@ -25,7 +25,7 @@ export const accWebhooks = (app: Express) => {
|
||||
.parse(req.body?.payload)
|
||||
|
||||
const onVersionAdded = onVersionAddedFactory({
|
||||
getAccSyncItemByUrn: getAccSyncItemByUrnFactory({ db }),
|
||||
queryAllAccSyncItems: queryAllAccSyncItemsFactory({ db }),
|
||||
upsertAccSyncItem: upsertAccSyncItemFactory({ db })
|
||||
})
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import {
|
||||
queryAllPendingAccSyncItemsFactory,
|
||||
queryAllAccSyncItemsFactory,
|
||||
updateAccSyncItemStatusFactory
|
||||
} from '@/modules/acc/repositories/accSyncItems'
|
||||
import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations'
|
||||
@@ -22,17 +22,19 @@ import {
|
||||
} from '@/modules/core/repositories/tokens'
|
||||
import { createAppTokenFactory } from '@/modules/core/services/tokens'
|
||||
import { TIME_MS } from '@speckle/shared'
|
||||
import type { AccRegion } from '@/modules/acc/domain/constants'
|
||||
import { AccSyncItemStatuses, type AccRegion } from '@/modules/acc/domain/constants'
|
||||
import { triggerSyncItemAutomationFactory } from '@/modules/acc/services/automate'
|
||||
|
||||
const queryAllPendingAccSyncItems = queryAllPendingAccSyncItemsFactory({ db })
|
||||
const queryAllAccSyncItems = queryAllAccSyncItemsFactory({ db })
|
||||
|
||||
export const schedulePendingSyncItemsCheck = (deps: {
|
||||
scheduleExecution: ScheduleExecution
|
||||
}) => {
|
||||
const callback = async (_now: Date, { logger }: { logger: Logger }) => {
|
||||
const tokenData = await getToken()
|
||||
for await (const items of queryAllPendingAccSyncItems()) {
|
||||
for await (const items of queryAllAccSyncItems({
|
||||
filter: { status: AccSyncItemStatuses.pending }
|
||||
})) {
|
||||
for (const syncItem of items) {
|
||||
const manifest = await getManifestByUrn(
|
||||
{
|
||||
|
||||
@@ -191,7 +191,7 @@ export const getPaginatedAccSyncItemsFactory =
|
||||
}
|
||||
|
||||
export type UpdateAccSyncItem = (params: {
|
||||
syncItem: Pick<AccSyncItem, 'status' | 'id'>
|
||||
syncItem: Partial<AccSyncItem> & Pick<AccSyncItem, 'id'>
|
||||
}) => Promise<AccSyncItem>
|
||||
|
||||
export const updateAccSyncItemFactory =
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { AccSyncItemStatuses } from '@/modules/acc/domain/constants'
|
||||
import type {
|
||||
GetAccSyncItemByUrn,
|
||||
QueryAllAccSyncItems,
|
||||
UpsertAccSyncItem
|
||||
} from '@/modules/acc/domain/operations'
|
||||
import { logger } from '@/observability/logging'
|
||||
@@ -13,24 +13,37 @@ type OnVersionAdded = (params: {
|
||||
|
||||
export const onVersionAddedFactory =
|
||||
(deps: {
|
||||
getAccSyncItemByUrn: GetAccSyncItemByUrn
|
||||
queryAllAccSyncItems: QueryAllAccSyncItems
|
||||
upsertAccSyncItem: UpsertAccSyncItem
|
||||
}): OnVersionAdded =>
|
||||
async ({ fileLineageUrn, fileVersionUrn, fileVersionIndex }) => {
|
||||
const syncItem = await deps.getAccSyncItemByUrn({ lineageUrn: fileLineageUrn })
|
||||
for await (const syncItems of deps.queryAllAccSyncItems({
|
||||
filter: { lineageUrn: fileLineageUrn }
|
||||
})) {
|
||||
for (const syncItem of syncItems) {
|
||||
if (syncItem.accFileVersionIndex > fileVersionIndex) {
|
||||
logger.warn(
|
||||
{
|
||||
syncItemId: syncItem.id,
|
||||
currentVersion: syncItem.accFileVersionIndex,
|
||||
incomingVersion: fileVersionIndex,
|
||||
incomingAccFile: {
|
||||
fileLineageUrn,
|
||||
fileVersionUrn,
|
||||
fileVersionIndex
|
||||
}
|
||||
},
|
||||
'Received event for superseded version of sync item {syncItemId} - Current: {currentVersion} Incoming: {incomingVersion}'
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if (!syncItem) {
|
||||
logger.warn(
|
||||
{ fileLineageUrn },
|
||||
'Received version added event for file with unknown lineage urn {lineageUrn}'
|
||||
)
|
||||
return
|
||||
await deps.upsertAccSyncItem({
|
||||
...syncItem,
|
||||
status: AccSyncItemStatuses.pending,
|
||||
accFileVersionIndex: fileVersionIndex,
|
||||
accFileVersionUrn: fileVersionUrn
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await deps.upsertAccSyncItem({
|
||||
...syncItem,
|
||||
status: AccSyncItemStatuses.pending,
|
||||
accFileVersionIndex: fileVersionIndex,
|
||||
accFileVersionUrn: fileVersionUrn
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1081,7 +1081,7 @@ export type CurrencyBasedPrices = {
|
||||
|
||||
export type DeleteAccSyncItemInput = {
|
||||
id: Scalars['ID']['input'];
|
||||
projectId: Scalars['ID']['input'];
|
||||
projectId: Scalars['String']['input'];
|
||||
};
|
||||
|
||||
export type DeleteModelInput = {
|
||||
@@ -4144,7 +4144,7 @@ export type TriggeredAutomationsStatus = {
|
||||
|
||||
export type UpdateAccSyncItemInput = {
|
||||
id: Scalars['ID']['input'];
|
||||
projectId: Scalars['ID']['input'];
|
||||
projectId: Scalars['String']['input'];
|
||||
status: AccSyncItemStatus;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user