push work on to database, simplify application logic

This commit is contained in:
Iain Sproat
2025-03-31 12:03:43 +01:00
parent 3af5ad16fe
commit f75b77e899
6 changed files with 65 additions and 100 deletions
@@ -1,7 +1,4 @@
import {
FileUploadConvertedStatus,
FileUploadRecord
} from '@/modules/fileuploads/helpers/types'
import { FileUploadRecord } from '@/modules/fileuploads/helpers/types'
import { SaveUploadFileInput } from '@/modules/fileuploads/repositories/fileUploads'
import { Optional } from '@speckle/shared'
@@ -11,17 +8,10 @@ export type GetFileInfo = (args: {
export type SaveUploadFile = (args: SaveUploadFileInput) => Promise<FileUploadRecord>
export type UpdateUploadFile = (args: {
fileId: string
newStatus: FileUploadConvertedStatus
}) => Promise<FileUploadRecord>
export type UpdateFileStatusAndNotify = (params: {
streamId: string
branchName: string
fileId: string
newStatus: FileUploadConvertedStatus
}) => Promise<void>
export type GarbageCollectPendingUploadedFiles = (args: {
timeoutThresholdSeconds: number
}) => Promise<FileUploadRecord[]>
export type GetAllPendingUploads = (
options?: Partial<{ limit: number }>
) => Promise<FileUploadRecord[]>
export type NotifyChangeInFileStatus = (params: {
file: FileUploadRecord
}) => Promise<void>
+7 -7
View File
@@ -1,5 +1,5 @@
import cron from 'node-cron'
import { updateUploadAndNotifyFactory } from '@/modules/fileuploads/services/management'
import { notifyChangeInFileStatus } from '@/modules/fileuploads/services/management'
import { moduleLogger } from '@/observability/logging'
import {
onFileImportProcessedFactory,
@@ -7,9 +7,8 @@ import {
parseMessagePayload
} from '@/modules/fileuploads/services/resultListener'
import {
getAllPendingUploadsFactory,
getFileInfoFactory,
updateUploadFileFactory
expireOldPendingUploadsFactory,
getFileInfoFactory
} from '@/modules/fileuploads/repositories/fileUploads'
import { db } from '@/db/knex'
import { publish } from '@/modules/shared/utils/subscriptions'
@@ -46,10 +45,11 @@ const scheduleFileImportExpiry = async ({
for (const projectDb of [db, ...regionClients]) {
fileImportExpiryHandlers.push(
manageFileImportExpiryFactory({
getPendingUploads: getAllPendingUploadsFactory({ db: projectDb }),
updateUploadStatus: updateUploadAndNotifyFactory({
garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({
db: projectDb
}),
notifyUploadStatus: notifyChangeInFileStatus({
getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }),
updateUploadFile: updateUploadFileFactory({ db: projectDb }),
publish
})
})
@@ -1,9 +1,8 @@
import { Branches, FileUploads, knex } from '@/modules/core/dbSchema'
import {
GetAllPendingUploads,
GarbageCollectPendingUploadedFiles,
GetFileInfo,
SaveUploadFile,
UpdateUploadFile
SaveUploadFile
} from '@/modules/fileuploads/domain/operations'
import {
FileUploadConvertedStatus,
@@ -80,35 +79,27 @@ export const saveUploadFileFactory =
return newRecord as FileUploadRecord
}
export const updateUploadFileFactory =
(deps: { db: Knex }): UpdateUploadFile =>
async ({ fileId, newStatus }) => {
const [updatedRecord] = await tables
.fileUploads(deps.db)
.where({ [FileUploads.col.id]: fileId })
export const expireOldPendingUploadsFactory =
(deps: { db: Knex }): GarbageCollectPendingUploadedFiles =>
async (params: { timeoutThresholdSeconds: number }) => {
const updatedRows = await deps
.db(FileUploads.name)
.whereIn(FileUploads.withoutTablePrefix.col.convertedStatus, [
FileUploadConvertedStatus.Converting,
FileUploadConvertedStatus.Queued
])
.andWhere(
FileUploads.withoutTablePrefix.col.uploadDate,
'<',
deps.db.raw(`now() - interval '${params.timeoutThresholdSeconds} seconds'`)
)
.update({
[FileUploads.withoutTablePrefix.col.convertedStatus]: newStatus
[FileUploads.withoutTablePrefix.col.convertedStatus]:
FileUploadConvertedStatus.Error
})
.returning<FileUploadRecord[]>('*')
return updatedRecord
}
export const getAllPendingUploadsFactory =
(deps: { db: Knex }): GetAllPendingUploads =>
async (options) => {
const { limit } = options || {}
const q = tables
.fileUploads(deps.db)
.whereIn(FileUploads.col.convertedStatus, [
FileUploadConvertedStatus.Queued,
FileUploadConvertedStatus.Converting
])
.orderBy(FileUploads.col.uploadDate, 'asc')
if (limit) {
q.limit(limit)
}
return await q
return updatedRows
}
const getPendingUploadsBaseQueryFactory =
@@ -6,8 +6,7 @@ import {
} from '@/modules/core/graph/generated/graphql'
import {
SaveUploadFile,
UpdateFileStatusAndNotify,
UpdateUploadFile
NotifyChangeInFileStatus
} from '@/modules/fileuploads/domain/operations'
import { SaveUploadFileInput } from '@/modules/fileuploads/repositories/fileUploads'
import {
@@ -56,43 +55,43 @@ export const insertNewUploadAndNotifyFactory =
})
}
export const updateUploadAndNotifyFactory =
export const notifyChangeInFileStatus =
(deps: {
getStreamBranchByName: GetStreamBranchByName
updateUploadFile: UpdateUploadFile
publish: PublishSubscription
}): UpdateFileStatusAndNotify =>
}): NotifyChangeInFileStatus =>
async (params) => {
const branch = await deps.getStreamBranchByName(params.streamId, params.branchName)
const file = await deps.updateUploadFile(params)
const { file } = params
const { id: fileId, streamId, branchName } = file
const branch = await deps.getStreamBranchByName(streamId, branchName)
if (!branch) {
await deps.publish(FileImportSubscriptions.ProjectPendingModelsUpdated, {
projectPendingModelsUpdated: {
id: file.id,
id: fileId,
type: ProjectPendingModelsUpdatedMessageType.Updated,
model: file
},
projectId: file.streamId
projectId: streamId
})
} else {
await deps.publish(FileImportSubscriptions.ProjectPendingVersionsUpdated, {
projectPendingVersionsUpdated: {
id: file.id,
id: fileId,
type: ProjectPendingVersionsUpdatedMessageType.Updated,
version: file
},
projectId: file.streamId,
branchName: file.branchName
projectId: streamId,
branchName
})
}
await deps.publish(FileImportSubscriptions.ProjectFileImportUpdated, {
projectFileImportUpdated: {
id: file.id,
id: fileId,
type: ProjectFileImportUpdatedMessageType.Created,
upload: file
},
projectId: file.streamId
projectId: streamId
})
}
@@ -1,39 +1,24 @@
import { Logger } from '@/observability/logging'
import {
GetAllPendingUploads,
UpdateFileStatusAndNotify
GarbageCollectPendingUploadedFiles,
NotifyChangeInFileStatus
} from '@/modules/fileuploads/domain/operations'
import { FileUploadConvertedStatus } from '@/modules/fileuploads/helpers/types'
export const manageFileImportExpiryFactory = (deps: {
getPendingUploads: GetAllPendingUploads
updateUploadStatus: UpdateFileStatusAndNotify
garbageCollectExpiredPendingUploads: GarbageCollectPendingUploadedFiles
notifyUploadStatus: NotifyChangeInFileStatus
}) => {
const {} = deps
return async (params: { logger: Logger; timeoutThresholdSeconds: number }) => {
const { logger } = params
const now = new Date().getTime()
logger.info('Managing file import expiry')
// Logic to manage file import expiry goes here
// check for expired file imports
// if over some timeout threshold, move them into an error state
// and notify the user
const pendingUploads = await deps.getPendingUploads({
limit: 100
const { logger, timeoutThresholdSeconds } = params
const updatedUploads = await deps.garbageCollectExpiredPendingUploads({
timeoutThresholdSeconds
})
logger.info(`Found ${pendingUploads.length} pending uploads`)
for (const upload of pendingUploads) {
const uploadDate = new Date(upload.uploadDate).getTime()
const diff = now - uploadDate
if (diff > 1000 * params.timeoutThresholdSeconds) {
logger.info(`Marking upload ${upload.id} as error`)
await deps.updateUploadStatus({
streamId: upload.streamId,
branchName: upload.branchName,
fileId: upload.id,
newStatus: FileUploadConvertedStatus.Error
})
}
logger.info(`Expired ${updatedUploads.length} pending uploads`)
for (const upload of updatedUploads) {
await deps.notifyUploadStatus({
file: upload
})
}
}
}
@@ -45,14 +45,13 @@ import { sendEmail } from '@/modules/emails/services/sending'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { manageFileImportExpiryFactory } from '@/modules/fileuploads/services/tasks'
import {
getAllPendingUploadsFactory,
expireOldPendingUploadsFactory,
getFileInfoFactory,
saveUploadFileFactory,
updateUploadFileFactory
saveUploadFileFactory
} from '@/modules/fileuploads/repositories/fileUploads'
import {
insertNewUploadAndNotifyFactory,
updateUploadAndNotifyFactory
notifyChangeInFileStatus
} from '@/modules/fileuploads/services/management'
import { publish } from '@/modules/shared/utils/subscriptions'
import { testLogger as logger } from '@/observability/logging'
@@ -122,10 +121,11 @@ const createUser = createUserFactory({
})
const garbageCollector = manageFileImportExpiryFactory({
getPendingUploads: getAllPendingUploadsFactory({ db }),
updateUploadStatus: updateUploadAndNotifyFactory({
garbageCollectExpiredPendingUploads: expireOldPendingUploadsFactory({
db
}),
notifyUploadStatus: notifyChangeInFileStatus({
getStreamBranchByName: getStreamBranchByNameFactory({ db }),
updateUploadFile: updateUploadFileFactory({ db }),
publish
})
})