diff --git a/packages/frontend-2/lib/common/generated/gql/graphql.ts b/packages/frontend-2/lib/common/generated/gql/graphql.ts index 944a40876..5b5779bff 100644 --- a/packages/frontend-2/lib/common/generated/gql/graphql.ts +++ b/packages/frontend-2/lib/common/generated/gql/graphql.ts @@ -1056,7 +1056,7 @@ export type CurrencyBasedPrices = { export type DeleteAccSyncItemInput = { id: Scalars['ID']['input']; - projectId: Scalars['ID']['input']; + projectId: Scalars['String']['input']; }; export type DeleteModelInput = { @@ -4119,7 +4119,7 @@ export type TriggeredAutomationsStatus = { export type UpdateAccSyncItemInput = { id: Scalars['ID']['input']; - projectId: Scalars['ID']['input']; + projectId: Scalars['String']['input']; status: AccSyncItemStatus; }; diff --git a/packages/server/assets/acc/typedefs/accSyncItems.graphql b/packages/server/assets/acc/typedefs/accSyncItems.graphql index 35c2bb9ae..7798e6227 100644 --- a/packages/server/assets/acc/typedefs/accSyncItems.graphql +++ b/packages/server/assets/acc/typedefs/accSyncItems.graphql @@ -39,13 +39,13 @@ enum AccSyncItemStatus { } input DeleteAccSyncItemInput { - projectId: ID! id: ID! + projectId: String! } input UpdateAccSyncItemInput { - projectId: ID! id: ID! + projectId: String! status: AccSyncItemStatus! } diff --git a/packages/server/modules/acc/domain/operations.ts b/packages/server/modules/acc/domain/operations.ts index bcb741c44..6436dab8a 100644 --- a/packages/server/modules/acc/domain/operations.ts +++ b/packages/server/modules/acc/domain/operations.ts @@ -8,10 +8,6 @@ export type UpdateAccSyncItemStatus = (args: { status: AccSyncItemStatus }) => Promise -export type GetAccSyncItemByUrn = (args: { - lineageUrn: string -}) => Promise - export type GetAccSyncItemById = (args: { id: string }) => Promise export type ListAccSyncItems = (args: { @@ -24,8 +20,12 @@ export type ListAccSyncItems = (args: { export type CountAccSyncItems = (args: { projectId: string }) => Promise -export type DeleteAccSyncItemByUrn = (args: { lineageUrn: string }) => Promise - export type DeleteAccSyncItemById = (args: { id: string }) => Promise -export type QueryAllAccSyncItems = () => AsyncGenerator +export type QueryAllAccSyncItems = (args: { + batchSize?: number + filter?: { + status?: AccSyncItemStatus + lineageUrn?: string + } +}) => AsyncGenerator diff --git a/packages/server/modules/acc/repositories/accSyncItems.ts b/packages/server/modules/acc/repositories/accSyncItems.ts index 68ced2315..f9de48e06 100644 --- a/packages/server/modules/acc/repositories/accSyncItems.ts +++ b/packages/server/modules/acc/repositories/accSyncItems.ts @@ -2,9 +2,7 @@ import { AccSyncItems } from '@/modules/acc/dbSchema' import type { CountAccSyncItems, DeleteAccSyncItemById, - DeleteAccSyncItemByUrn, GetAccSyncItemById, - GetAccSyncItemByUrn, ListAccSyncItems, QueryAllAccSyncItems, UpdateAccSyncItemStatus, @@ -14,7 +12,6 @@ import { executeBatchedSelect } from '@/modules/shared/helpers/dbHelper' import type { AccSyncItem } from '@/modules/acc/domain/types' import type { Knex } from 'knex' import { without } from 'lodash-es' -import { AccSyncItemStatuses } from '@/modules/acc/domain/constants' const tables = { accSyncItems: (db: Knex) => db(AccSyncItems.name) @@ -32,18 +29,6 @@ export const getAccSyncItemByIdFactory = ) } -export const getAccSyncItemByUrnFactory = - (deps: { db: Knex }): GetAccSyncItemByUrn => - async ({ lineageUrn }) => { - return ( - (await tables - .accSyncItems(deps.db) - .select('*') - .where(AccSyncItems.col.accFileLineageUrn, lineageUrn) - .first()) ?? null - ) - } - export const upsertAccSyncItemFactory = (deps: { db: Knex }): UpsertAccSyncItem => async (item) => { @@ -75,15 +60,6 @@ export const updateAccSyncItemStatusFactory = ) } -export const deleteAccSyncItemByUrnFactory = - (deps: { db: Knex }): DeleteAccSyncItemByUrn => - async ({ lineageUrn }) => { - return await tables - .accSyncItems(deps.db) - .where(AccSyncItems.col.accFileLineageUrn, lineageUrn) - .delete() - } - export const deleteAccSyncItemByIdFactory = (deps: { db: Knex }): DeleteAccSyncItemById => async ({ id }) => { @@ -121,13 +97,23 @@ export const countAccSyncItemsFactory = return Number.parseInt(count as string) } -export const queryAllPendingAccSyncItemsFactory = +export const queryAllAccSyncItemsFactory = (deps: { db: Knex }): QueryAllAccSyncItems => - () => { - const selectItems = tables + ({ batchSize = 10, filter = {} }) => { + const { status, lineageUrn } = filter + + const query = tables .accSyncItems(deps.db) .select('*') - .where(AccSyncItems.col.status, AccSyncItemStatuses.pending) .orderBy(AccSyncItems.col.createdAt) - return executeBatchedSelect(selectItems, { batchSize: 10 }) + + if (filter.status) { + query.where(AccSyncItems.withoutTablePrefix.col.status, status) + } + + if (filter.lineageUrn) { + query.where(AccSyncItems.withoutTablePrefix.col.accFileLineageUrn, lineageUrn) + } + + return executeBatchedSelect(query, { batchSize }) } diff --git a/packages/server/modules/acc/rest/webhooks.ts b/packages/server/modules/acc/rest/webhooks.ts index 529c5e973..72fc61874 100644 --- a/packages/server/modules/acc/rest/webhooks.ts +++ b/packages/server/modules/acc/rest/webhooks.ts @@ -1,5 +1,5 @@ import { - getAccSyncItemByUrnFactory, + queryAllAccSyncItemsFactory, upsertAccSyncItemFactory } from '@/modules/acc/repositories/accSyncItems' import { onVersionAddedFactory } from '@/modules/acc/services/webhooks' @@ -25,7 +25,7 @@ export const accWebhooks = (app: Express) => { .parse(req.body?.payload) const onVersionAdded = onVersionAddedFactory({ - getAccSyncItemByUrn: getAccSyncItemByUrnFactory({ db }), + queryAllAccSyncItems: queryAllAccSyncItemsFactory({ db }), upsertAccSyncItem: upsertAccSyncItemFactory({ db }) }) diff --git a/packages/server/modules/acc/services/cron.ts b/packages/server/modules/acc/services/cron.ts index 3d742a096..903ee256f 100644 --- a/packages/server/modules/acc/services/cron.ts +++ b/packages/server/modules/acc/services/cron.ts @@ -1,5 +1,5 @@ import { - queryAllPendingAccSyncItemsFactory, + queryAllAccSyncItemsFactory, updateAccSyncItemStatusFactory } from '@/modules/acc/repositories/accSyncItems' import type { ScheduleExecution } from '@/modules/core/domain/scheduledTasks/operations' @@ -22,17 +22,19 @@ import { } from '@/modules/core/repositories/tokens' import { createAppTokenFactory } from '@/modules/core/services/tokens' import { TIME_MS } from '@speckle/shared' -import type { AccRegion } from '@/modules/acc/domain/constants' +import { AccSyncItemStatuses, type AccRegion } from '@/modules/acc/domain/constants' import { triggerSyncItemAutomationFactory } from '@/modules/acc/services/automate' -const queryAllPendingAccSyncItems = queryAllPendingAccSyncItemsFactory({ db }) +const queryAllAccSyncItems = queryAllAccSyncItemsFactory({ db }) export const schedulePendingSyncItemsCheck = (deps: { scheduleExecution: ScheduleExecution }) => { const callback = async (_now: Date, { logger }: { logger: Logger }) => { const tokenData = await getToken() - for await (const items of queryAllPendingAccSyncItems()) { + for await (const items of queryAllAccSyncItems({ + filter: { status: AccSyncItemStatuses.pending } + })) { for (const syncItem of items) { const manifest = await getManifestByUrn( { diff --git a/packages/server/modules/acc/services/management.ts b/packages/server/modules/acc/services/management.ts index 2890a5f83..0bc555f6c 100644 --- a/packages/server/modules/acc/services/management.ts +++ b/packages/server/modules/acc/services/management.ts @@ -191,7 +191,7 @@ export const getPaginatedAccSyncItemsFactory = } export type UpdateAccSyncItem = (params: { - syncItem: Pick + syncItem: Partial & Pick }) => Promise export const updateAccSyncItemFactory = diff --git a/packages/server/modules/acc/services/webhooks.ts b/packages/server/modules/acc/services/webhooks.ts index cd03c2a96..bc381973d 100644 --- a/packages/server/modules/acc/services/webhooks.ts +++ b/packages/server/modules/acc/services/webhooks.ts @@ -1,6 +1,6 @@ import { AccSyncItemStatuses } from '@/modules/acc/domain/constants' import type { - GetAccSyncItemByUrn, + QueryAllAccSyncItems, UpsertAccSyncItem } from '@/modules/acc/domain/operations' import { logger } from '@/observability/logging' @@ -13,24 +13,37 @@ type OnVersionAdded = (params: { export const onVersionAddedFactory = (deps: { - getAccSyncItemByUrn: GetAccSyncItemByUrn + queryAllAccSyncItems: QueryAllAccSyncItems upsertAccSyncItem: UpsertAccSyncItem }): OnVersionAdded => async ({ fileLineageUrn, fileVersionUrn, fileVersionIndex }) => { - const syncItem = await deps.getAccSyncItemByUrn({ lineageUrn: fileLineageUrn }) + for await (const syncItems of deps.queryAllAccSyncItems({ + filter: { lineageUrn: fileLineageUrn } + })) { + for (const syncItem of syncItems) { + if (syncItem.accFileVersionIndex > fileVersionIndex) { + logger.warn( + { + syncItemId: syncItem.id, + currentVersion: syncItem.accFileVersionIndex, + incomingVersion: fileVersionIndex, + incomingAccFile: { + fileLineageUrn, + fileVersionUrn, + fileVersionIndex + } + }, + 'Received event for superseded version of sync item {syncItemId} - Current: {currentVersion} Incoming: {incomingVersion}' + ) + continue + } - if (!syncItem) { - logger.warn( - { fileLineageUrn }, - 'Received version added event for file with unknown lineage urn {lineageUrn}' - ) - return + await deps.upsertAccSyncItem({ + ...syncItem, + status: AccSyncItemStatuses.pending, + accFileVersionIndex: fileVersionIndex, + accFileVersionUrn: fileVersionUrn + }) + } } - - await deps.upsertAccSyncItem({ - ...syncItem, - status: AccSyncItemStatuses.pending, - accFileVersionIndex: fileVersionIndex, - accFileVersionUrn: fileVersionUrn - }) } diff --git a/packages/server/modules/core/graph/generated/graphql.ts b/packages/server/modules/core/graph/generated/graphql.ts index c6de87fa1..c9f7a74bf 100644 --- a/packages/server/modules/core/graph/generated/graphql.ts +++ b/packages/server/modules/core/graph/generated/graphql.ts @@ -1081,7 +1081,7 @@ export type CurrencyBasedPrices = { export type DeleteAccSyncItemInput = { id: Scalars['ID']['input']; - projectId: Scalars['ID']['input']; + projectId: Scalars['String']['input']; }; export type DeleteModelInput = { @@ -4144,7 +4144,7 @@ export type TriggeredAutomationsStatus = { export type UpdateAccSyncItemInput = { id: Scalars['ID']['input']; - projectId: Scalars['ID']['input']; + projectId: Scalars['String']['input']; status: AccSyncItemStatus; };