chore(server): gendo IoC #2 - createRenderRequestFactory
This commit is contained in:
@@ -3,6 +3,7 @@ import {
|
||||
BlobStorageItemInput
|
||||
} from '@/modules/blobstorage/domain/types'
|
||||
import { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
|
||||
import type { Readable } from 'stream'
|
||||
|
||||
export type GetBlobs = (params: {
|
||||
streamId?: MaybeNullOrUndefined<string>
|
||||
@@ -30,3 +31,22 @@ export type GetBlobMetadataCollection = (params: {
|
||||
limit?: Nullable<number>
|
||||
cursor?: Nullable<string>
|
||||
}) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable<string> }>
|
||||
|
||||
export type UploadFileStream = (
|
||||
storeFileStream: (params: {
|
||||
objectKey: string
|
||||
fileStream: Readable | Buffer
|
||||
}) => Promise<{
|
||||
fileHash: string
|
||||
}>,
|
||||
params1: {
|
||||
streamId: string
|
||||
userId: string | undefined
|
||||
},
|
||||
params2: {
|
||||
blobId: string
|
||||
fileName: string
|
||||
fileType: string | undefined
|
||||
fileStream: Readable | Buffer
|
||||
}
|
||||
) => Promise<{ blobId: string; fileName: string; fileHash: string }>
|
||||
|
||||
@@ -2,13 +2,13 @@ import {
|
||||
DeleteBlob,
|
||||
GetBlobMetadata,
|
||||
UpdateBlob,
|
||||
UploadFileStream,
|
||||
UpsertBlob
|
||||
} from '@/modules/blobstorage/domain/operations'
|
||||
import { BlobStorageItem } from '@/modules/blobstorage/domain/types'
|
||||
import { BadRequestError } from '@/modules/shared/errors'
|
||||
import { getFileSizeLimitMB } from '@/modules/shared/helpers/envHelper'
|
||||
import { MaybeAsync } from '@speckle/shared'
|
||||
import { Readable } from 'stream'
|
||||
|
||||
/**
|
||||
* File size limit in bytes
|
||||
@@ -16,20 +16,8 @@ import { Readable } from 'stream'
|
||||
export const getFileSizeLimit = () => getFileSizeLimitMB() * 1024 * 1024
|
||||
|
||||
export const uploadFileStreamFactory =
|
||||
(deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }) =>
|
||||
async (
|
||||
storeFileStream: (params: {
|
||||
objectKey: string
|
||||
fileStream: Readable | Buffer
|
||||
}) => Promise<{ fileHash: string }>,
|
||||
params1: { streamId: string; userId: string | undefined },
|
||||
params2: {
|
||||
blobId: string
|
||||
fileName: string
|
||||
fileType: string | undefined
|
||||
fileStream: Readable | Buffer
|
||||
}
|
||||
) => {
|
||||
(deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }): UploadFileStream =>
|
||||
async (storeFileStream, params1, params2) => {
|
||||
const { streamId, userId } = params1
|
||||
const { blobId, fileName, fileType, fileStream } = params2
|
||||
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql'
|
||||
import { GendoAIRender } from '@/modules/gendo/domain/types'
|
||||
import { NullableKeysToOptional } from '@speckle/shared'
|
||||
import { SetOptional } from 'type-fest'
|
||||
|
||||
export type StoreRender = (
|
||||
input: NullableKeysToOptional<SetOptional<GendoAIRender, 'createdAt' | 'updatedAt'>>
|
||||
) => Promise<GendoAIRender>
|
||||
|
||||
export type CreateRenderRequest = (
|
||||
input: GendoAiRenderInput & {
|
||||
userId: string
|
||||
}
|
||||
) => Promise<GendoAIRender>
|
||||
@@ -0,0 +1,3 @@
|
||||
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
|
||||
|
||||
export type GendoAIRender = GendoAIRenderRecord
|
||||
@@ -2,26 +2,39 @@ import { authorizeResolver } from '@/modules/shared'
|
||||
import { Resolvers } from '@/modules/core/graph/generated/graphql'
|
||||
import { Roles } from '@speckle/shared'
|
||||
import {
|
||||
getGendoAIAPIEndpoint,
|
||||
getGendoAIKey,
|
||||
getServerOrigin
|
||||
} from '@/modules/shared/helpers/envHelper'
|
||||
import {
|
||||
createGendoAIRenderRequest,
|
||||
createRenderRequestFactory,
|
||||
getGendoAIRenderRequest,
|
||||
getGendoAIRenderRequests
|
||||
} from '@/modules/gendo/services'
|
||||
import crs from 'crypto-random-string'
|
||||
import {
|
||||
ProjectSubscriptions,
|
||||
filteredSubscribe
|
||||
filteredSubscribe,
|
||||
publish
|
||||
} from '@/modules/shared/utils/subscriptions'
|
||||
import {
|
||||
getRateLimitResult,
|
||||
isRateLimitBreached
|
||||
} from '@/modules/core/services/ratelimiter'
|
||||
import { RateLimitError } from '@/modules/core/errors/ratelimit'
|
||||
import { GendoRenderRequestError } from '@/modules/gendo/errors/main'
|
||||
import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management'
|
||||
import {
|
||||
updateBlobFactory,
|
||||
upsertBlobFactory
|
||||
} from '@/modules/blobstorage/repositories'
|
||||
import { storeFileStream } from '@/modules/blobstorage/objectStorage'
|
||||
import { storeRenderFactory } from '@/modules/gendo/repositories'
|
||||
import { db } from '@/db/knex'
|
||||
|
||||
const createRenderRequest = createRenderRequestFactory({
|
||||
uploadFileStream: uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
}),
|
||||
storeFileStream,
|
||||
storeRender: storeRenderFactory({ db }),
|
||||
publish,
|
||||
fetch
|
||||
})
|
||||
|
||||
export = {
|
||||
Version: {
|
||||
@@ -58,44 +71,11 @@ export = {
|
||||
throw new RateLimitError(rateLimitResult)
|
||||
}
|
||||
|
||||
const endpoint = getGendoAIAPIEndpoint() as string
|
||||
const bearer = getGendoAIKey() as string
|
||||
const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo`
|
||||
|
||||
// TODO Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env
|
||||
const gendoRequestBody = {
|
||||
userId: ctx.userId,
|
||||
depthMap: args.input.baseImage,
|
||||
prompt: args.input.prompt,
|
||||
webhookUrl
|
||||
}
|
||||
|
||||
const response = await fetch(endpoint, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${bearer}`
|
||||
},
|
||||
body: JSON.stringify(gendoRequestBody)
|
||||
await createRenderRequest({
|
||||
...args.input,
|
||||
userId: ctx.userId!
|
||||
})
|
||||
|
||||
const status = response.status
|
||||
|
||||
if (status === 200) {
|
||||
const body = (await response.json()) as { status: string; generationId: string }
|
||||
await createGendoAIRenderRequest({
|
||||
...args.input,
|
||||
userId: ctx.userId as string,
|
||||
status: body.status,
|
||||
gendoGenerationId: body.generationId,
|
||||
id: crs({ length: 10 })
|
||||
})
|
||||
} else {
|
||||
const body = await response.json().catch((e) => ({ error: `${e}` }))
|
||||
throw new GendoRenderRequestError('Failed to enqueue gendo render.', {
|
||||
info: { body }
|
||||
})
|
||||
}
|
||||
return true
|
||||
}
|
||||
},
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
import { GendoAIRenders } from '@/modules/core/dbSchema'
|
||||
import { StoreRender } from '@/modules/gendo/domain/operations'
|
||||
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
|
||||
import { Knex } from 'knex'
|
||||
import { pick } from 'lodash'
|
||||
|
||||
const tables = {
|
||||
gendoAIRenders: (db: Knex) => db<GendoAIRenderRecord>(GendoAIRenders.name)
|
||||
}
|
||||
|
||||
export const storeRenderFactory =
|
||||
(deps: { db: Knex }): StoreRender =>
|
||||
async (input) => {
|
||||
const [newRec] = await tables
|
||||
.gendoAIRenders(deps.db)
|
||||
.insert(pick(input, GendoAIRenders.withoutTablePrefix.cols))
|
||||
.returning('*')
|
||||
return newRec
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
import crs from 'crypto-random-string'
|
||||
import { GendoAIRenders, knex } from '@/modules/core/dbSchema'
|
||||
import { GendoAiRenderInput } from '@/modules/core/graph/generated/graphql'
|
||||
import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types'
|
||||
import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions'
|
||||
import {
|
||||
ProjectSubscriptions,
|
||||
publish,
|
||||
PublishSubscription
|
||||
} from '@/modules/shared/utils/subscriptions'
|
||||
import { Merge } from 'type-fest'
|
||||
import { storeFileStream } from '@/modules/blobstorage/objectStorage'
|
||||
import { uploadFileStreamFactory } from '@/modules/blobstorage/services/management'
|
||||
@@ -11,49 +14,97 @@ import {
|
||||
upsertBlobFactory
|
||||
} from '@/modules/blobstorage/repositories'
|
||||
import { db } from '@/db/knex'
|
||||
import { CreateRenderRequest, StoreRender } from '@/modules/gendo/domain/operations'
|
||||
import { UploadFileStream } from '@/modules/blobstorage/domain/operations'
|
||||
import {
|
||||
getGendoAIAPIEndpoint,
|
||||
getGendoAIKey,
|
||||
getServerOrigin
|
||||
} from '@/modules/shared/helpers/envHelper'
|
||||
import { GendoRenderRequestError } from '@/modules/gendo/errors/main'
|
||||
|
||||
const uploadFileStream = uploadFileStreamFactory({
|
||||
upsertBlob: upsertBlobFactory({ db }),
|
||||
updateBlob: updateBlobFactory({ db })
|
||||
})
|
||||
|
||||
export async function createGendoAIRenderRequest(
|
||||
input: GendoAiRenderInput & {
|
||||
userId: string
|
||||
status: string
|
||||
id: string
|
||||
gendoGenerationId?: string
|
||||
}
|
||||
) {
|
||||
const baseImageBuffer = Buffer.from(
|
||||
input.baseImage.replace(/^data:image\/\w+;base64,/, ''),
|
||||
'base64'
|
||||
)
|
||||
export const createRenderRequestFactory =
|
||||
(deps: {
|
||||
uploadFileStream: UploadFileStream
|
||||
storeFileStream: typeof storeFileStream
|
||||
storeRender: StoreRender
|
||||
publish: PublishSubscription
|
||||
fetch: typeof fetch
|
||||
}): CreateRenderRequest =>
|
||||
async (input) => {
|
||||
const endpoint = getGendoAIAPIEndpoint()
|
||||
const bearer = getGendoAIKey() as string
|
||||
const webhookUrl = `${getServerOrigin()}/api/thirdparty/gendo`
|
||||
|
||||
const blobId = crs({ length: 10 })
|
||||
await uploadFileStream(
|
||||
storeFileStream,
|
||||
{ streamId: input.projectId, userId: input.userId },
|
||||
{
|
||||
blobId,
|
||||
fileName: `gendo_base_image_${blobId}.png`,
|
||||
fileType: 'png',
|
||||
fileStream: baseImageBuffer
|
||||
// TODO: Fn handles too many concerns, refactor (e.g. the client fetch call)
|
||||
// TODO: Fire off request to gendo api & get generationId, create record in db. Note: use gendo api key from env
|
||||
const gendoRequestBody = {
|
||||
userId: input.userId,
|
||||
depthMap: input.baseImage,
|
||||
prompt: input.prompt,
|
||||
webhookUrl
|
||||
}
|
||||
)
|
||||
|
||||
input.baseImage = blobId
|
||||
const response = await fetch(endpoint, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${bearer}`
|
||||
},
|
||||
body: JSON.stringify(gendoRequestBody)
|
||||
})
|
||||
|
||||
const [newRecord] = await GendoAIRenders.knex().insert(input, '*')
|
||||
const status = response.status
|
||||
if (status !== 200) {
|
||||
const body = await response.json().catch((e) => ({ error: `${e}` }))
|
||||
throw new GendoRenderRequestError('Failed to enqueue gendo render.', {
|
||||
info: { body }
|
||||
})
|
||||
}
|
||||
|
||||
publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, {
|
||||
projectVersionGendoAIRenderCreated: newRecord
|
||||
})
|
||||
const gendoResponseBody = (await response.json()) as {
|
||||
status: string
|
||||
generationId: string
|
||||
}
|
||||
const baseImageBuffer = Buffer.from(
|
||||
input.baseImage.replace(/^data:image\/\w+;base64,/, ''),
|
||||
'base64'
|
||||
)
|
||||
|
||||
// TODO: Schedule a timeout fail after x minutes
|
||||
const blobId = crs({ length: 10 })
|
||||
await deps.uploadFileStream(
|
||||
deps.storeFileStream,
|
||||
{ streamId: input.projectId, userId: input.userId },
|
||||
{
|
||||
blobId,
|
||||
fileName: `gendo_base_image_${blobId}.png`,
|
||||
fileType: 'png',
|
||||
fileStream: baseImageBuffer
|
||||
}
|
||||
)
|
||||
|
||||
return newRecord as GendoAIRenderRecord
|
||||
}
|
||||
input.baseImage = blobId
|
||||
|
||||
const newRecord = await deps.storeRender({
|
||||
...input,
|
||||
status: gendoResponseBody.status,
|
||||
gendoGenerationId: gendoResponseBody.generationId,
|
||||
id: crs({ length: 10 })
|
||||
})
|
||||
|
||||
deps.publish(ProjectSubscriptions.ProjectVersionGendoAIRenderCreated, {
|
||||
projectVersionGendoAIRenderCreated: newRecord
|
||||
})
|
||||
|
||||
// TODO: Schedule a timeout fail after x minutes
|
||||
|
||||
return newRecord
|
||||
}
|
||||
|
||||
export async function updateGendoAIRenderRequest(
|
||||
input: Partial<{ status: string; responseImage: string }> & {
|
||||
|
||||
Reference in New Issue
Block a user