chore(fileimport service): improve error handling

This commit is contained in:
Iain Sproat
2025-02-11 12:17:39 +00:00
parent 10bda8b26a
commit 85fa5b603a
57 changed files with 830 additions and 301153 deletions
+6
View File
@@ -0,0 +1,6 @@
FILE_IMPORT_TIME_LIMIT_MIN='10'
MAX_OBJECT_SIZE_MB='10'
POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE='1'
POSTGRES_CONNECTION_ACQUIRE_TIMEOUT_MILLIS='16000'
POSTGRES_CONNECTION_CREATE_TIMEOUT_MILLIS='5000'
FF_WORKSPACES_MULTI_REGION_ENABLED=false
-18
View File
@@ -1,18 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch via Yarn",
"request": "launch",
"console": "integratedTerminal",
"runtimeArgs": ["dev"],
"runtimeExecutable": "yarn",
"skipFiles": ["<node_internals>/**"],
"envFile": "${workspaceFolder}/.env",
"type": "node"
}
]
}
+3 -3
View File
@@ -2,7 +2,7 @@ ARG NODE_ENV=production
FROM mcr.microsoft.com/dotnet/sdk:8.0-noble AS dotnet-build-stage
WORKDIR /app
COPY packages/fileimport-service/ifc-dotnet .
COPY packages/fileimport-service/src/ifc-dotnet .
RUN dotnet publish ifc-converter.csproj -c Release -o output/
@@ -69,8 +69,8 @@ ENV PYTHON_BINARY_PATH=${PYTHON_BINARY_PATH}
ARG DOTNET_BINARY_PATH=/usr/bin/dotnet
ENV DOTNET_BINARY_PATH=${DOTNET_BINARY_PATH}
COPY --link --from=dotnet-build-stage /app/output packages/fileimport-service/ifc-dotnet
COPY --link --from=dotnet-build-stage /app/output packages/fileimport-service/src/ifc-dotnet
WORKDIR /speckle-server/packages/fileimport-service
ENTRYPOINT [ "tini", "--", "node", "--no-experimental-fetch", "src/daemon.js"]
ENTRYPOINT [ "tini", "--", "/nodejs/bin/node", "--loader=./dist/src/aliasLoader.js", "bin/www.js" ]
+17 -5
View File
@@ -1,11 +1,23 @@
# `ifc-parser`
# Database Monitor
> TODO: description
Responsible for querying all databases and generating metrics.
## Usage
Metrics are available at `/metrics` endpoint and are in Prometheus format.
## Development
```bash
yarn dev
```
const ifcParser = require('ifc-parser');
// TODO: DEMONSTRATE API
## Databases with self-signed certificates
Add the self-signed CA certificate to a file at `packages/monitor-deployment/ca-certificate.crt`
Run `NODE_EXTRA_CA_CERTS=./ca-certificate.crt yarn dev` or `NODE_EXTRA_CA_CERTS=./ca-certificate.crt yarn start`
## Production
```bash
yarn start
```
+47 -6
View File
@@ -1,20 +1,61 @@
import { baseConfigs, globals } from '../../eslint.config.mjs'
import tseslint from 'typescript-eslint'
import {
baseConfigs,
getESMDirname,
globals,
prettierConfig
} from '../../eslint.config.mjs'
/**
* @type {Array<import('eslint').Linter.FlatConfig>}
*/
const configs = [
...baseConfigs,
{
ignores: ['**/ifc/**', '**/obj/**', '**/stl/**']
ignores: ['dist', 'public', 'docs']
},
{
files: ['**/*.js'],
ignores: ['**/*.mjs'],
languageOptions: {
sourceType: 'module',
globals: {
...globals.node
}
}
},
{
files: ['bin/www'],
languageOptions: {
sourceType: 'module',
globals: {
...globals.node
}
}
},
...tseslint.configs.recommendedTypeChecked.map((c) => ({
...c,
files: [...(c.files || []), '**/*.ts', '**/*.d.ts']
})),
{
files: ['**/*.ts', '**/*.d.ts'],
languageOptions: {
parserOptions: {
tsconfigRootDir: getESMDirname(import.meta.url),
project: './tsconfig.json'
}
},
rules: {
'@typescript-eslint/no-explicit-any': 'error',
'@typescript-eslint/no-unsafe-return': 'error'
}
},
{
files: ['**/*.spec.{js,ts}'],
languageOptions: {
globals: {
...globals.node
}
}
}
},
prettierConfig
]
export default configs
@@ -1,4 +0,0 @@
{
"extends": "../../jsconfig.base.json",
"include": ["src", "ifc"]
}
@@ -0,0 +1,34 @@
{
"main": {
"postgres": {
"connectionUri": "postgresql://speckle:speckle@127.0.0.1:5432/speckle",
"privateConnectionUri": "postgresql://speckle:speckle@postgres:5432/speckle",
"databaseName": "speckle"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9000",
"s3Region": "us-east-1"
}
},
"regions": {
"region1": {
"postgres": {
"connectionUri": "postgresql://speckle:speckle@127.0.0.1:5401/speckle",
"privateConnectionUri": "postgresql://speckle:speckle@postgres-region1:5432/speckle",
"databaseName": "speckle"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9020",
"s3Region": "us-east-1"
}
}
}
}
@@ -1,178 +0,0 @@
import sys, os
import json
from specklepy.objects import Base
from specklepy.objects.other import RenderMaterial
from specklepy.objects.geometry import Mesh
from specklepy.transports.server import ServerTransport
from specklepy.api.client import SpeckleClient
from specklepy.api import operations
from obj_file import ObjFile
import structlog
from logging import INFO, basicConfig
basicConfig(format="%(message)s", stream=sys.stdout, level=INFO)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.UnicodeDecoder(),
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
}
),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(INFO),
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
LOG = structlog.get_logger()
DEFAULT_BRANCH = "uploads"
def convert_material(obj_mat):
speckle_mat = RenderMaterial()
speckle_mat.name = obj_mat["name"]
if "diffuse" in obj_mat:
argb = [
1,
] + obj_mat["diffuse"]
speckle_mat.diffuse = int.from_bytes(
[int(val * 255) for val in argb], byteorder="big", signed=True
)
if "dissolved" in obj_mat:
speckle_mat.opacity = obj_mat["dissolved"]
return speckle_mat
def import_obj():
(
file_path,
tmp_results_path,
_,
stream_id,
branch_name,
commit_message,
_,
_,
_,
) = sys.argv[1:]
LOG.info("ImportOBJ argv[1:]:%s", sys.argv[1:])
# Parse input
obj = ObjFile(file_path)
LOG.info(
"Parsed obj with %s faces (%s vertices)", len(obj.faces), len(obj.vertices) * 3
)
speckle_root = Base()
speckle_root["@objects"] = []
for objname in obj.objects:
objLogger = LOG.bind(object_name=objname)
objLogger.info("Converting object")
speckle_obj = Base()
speckle_obj.name = objname
speckle_obj["@displayValue"] = []
speckle_root["@objects"].append(speckle_obj)
for obj_mesh in obj.objects[objname]:
speckle_vertices = [
coord for point in obj_mesh["vertices"] for coord in point
]
speckle_faces = []
for obj_face in obj_mesh["faces"]:
if len(obj_face) == 3:
speckle_faces.append(0)
elif len(obj_face) == 4:
speckle_faces.append(1)
else:
speckle_faces.append(len(obj_face))
speckle_faces.extend(obj_face)
has_vertex_colors = False
for vc in obj_mesh["vertex_colors"]:
if vc is not None:
has_vertex_colors = True
colors = []
if has_vertex_colors:
for vc in obj_mesh["vertex_colors"]:
if vc is None:
r, g, b = (1.0, 1.0, 1.0)
else:
r, g, b = vc
argb = (1.0, r, g, b)
color = int.from_bytes(
[int(val * 255) for val in argb], byteorder="big", signed=True
)
colors.append(color)
speckle_mesh = Mesh(
vertices=speckle_vertices,
faces=speckle_faces,
colors=colors,
textureCoordinates=[],
)
obj_material = obj_mesh["material"]
if obj_material:
speckle_mesh["renderMaterial"] = convert_material(obj_material)
speckle_obj["@displayValue"].append(speckle_mesh)
# Commit
client = SpeckleClient(
host=os.getenv("SPECKLE_SERVER_URL", "127.0.0.1:3000"), use_ssl=False
)
client.authenticate_with_token(os.environ["USER_TOKEN"])
if not client.branch.get(stream_id, branch_name):
client.branch.create(
stream_id,
branch_name,
"File upload branch" if branch_name == "uploads" else "",
)
transport = ServerTransport(client=client, stream_id=stream_id)
id = operations.send(
base=speckle_root, transports=[transport], use_default_cache=False
)
commit_id = client.commit.create(
stream_id=stream_id,
object_id=id,
branch_name=(branch_name or DEFAULT_BRANCH),
message=(commit_message or "OBJ file upload"),
source_application="OBJ",
)
return commit_id, tmp_results_path
if __name__ == "__main__":
from pathlib import Path
try:
commit_id, tmp_results_path = import_obj()
if not commit_id:
raise Exception("Can't create commit")
if isinstance(commit_id, Exception):
raise commit_id
results = {"success": True, "commitId": commit_id}
except Exception as ex:
LOG.exception(ex)
results = {"success": False, "error": str(ex)}
Path(tmp_results_path).write_text(json.dumps(results))
@@ -1,82 +0,0 @@
import os
import structlog
LOG = structlog.get_logger()
class MtlFileCollection(object):
def __init__(self, base_dir):
self.base_dir = base_dir
self.logged_unsupported = set()
self.materials = {}
self.crt_mat = None
def ensure_mat(self, directive):
if (
self.crt_mat is None
and f"no_mat_{directive}" not in self.logged_unsupported
):
LOG.info("Directive found outside material definition:%s", directive)
self.logged_unsupported.add(f"no_mat_{directive}")
return self.crt_mat is not None
def mtllib(self, fpath):
fpath = os.path.join(self.base_dir, os.path.basename(fpath))
if not os.path.isfile(fpath):
LOG.error("Missing MTL file:%s", fpath)
return
with open(fpath, "r") as f:
while True:
line = f.readline()
if not line:
break
if not line.strip() or line.startswith("#"):
continue
parts = line.strip().split(" ")
if parts[0] == "newmtl":
mat_name = " ".join(parts[1:])
self.crt_mat = {"name": mat_name}
self.materials[mat_name] = self.crt_mat
elif parts[0] == "Ka":
if self.ensure_mat("Ka"):
self.crt_mat["ambient"] = [float(x) for x in parts[1:]]
elif parts[0] == "Kd":
if self.ensure_mat("Kd"):
self.crt_mat["diffuse"] = [float(x) for x in parts[1:]]
elif parts[0] == "Ks":
if self.ensure_mat("Ks"):
self.crt_mat["specular_color"] = [float(x) for x in parts[1:]]
elif parts[0] == "Ns":
if self.ensure_mat("Ns"):
self.crt_mat["specular_exponent"] = float(parts[1])
elif parts[0] == "d":
if self.ensure_mat("d"):
self.crt_mat["dissolved"] = float(parts[1])
elif parts[0] == "Tr":
if self.ensure_mat("Tr"):
self.crt_mat["dissolved"] = 1.0 - float(parts[1])
elif parts[0] == "Ni":
if self.ensure_mat("Ni"):
self.crt_mat["refraction_index"] = float(parts[1])
elif parts[0] == "illum":
if self.ensure_mat("illum"):
self.crt_mat["illumination_mode"] = int(parts[1])
elif parts[0] == "Pr":
if self.ensure_mat("Pr"):
self.crt_mat["roughness"] = float(parts[1])
elif parts[0] == "Pm":
if self.ensure_mat("Pm"):
self.crt_mat["metallic"] = float(parts[1])
elif parts[0] == "Ke":
if self.ensure_mat("Ke"):
self.crt_mat["emissive"] = [float(x) for x in parts[1:]]
else:
if parts[0] not in self.logged_unsupported:
LOG.warn("Unsupported MTL directive: %s", parts[0])
self.logged_unsupported.add(parts[0])
self.crt_mat = None
def get_material(self, name):
return self.materials.get(name, None)
-124
View File
@@ -1,124 +0,0 @@
from mtl_file_collection import MtlFileCollection
import os
import structlog
LOG = structlog.get_logger()
class ObjFile(object):
def __init__(self, file_path) -> None:
self.logged_unsupported = set()
self.mtl_files = MtlFileCollection(os.path.dirname(file_path))
self.crt_object = ""
self.crt_mtl = ""
self.vertices = []
self.vertex_colors = []
self.faces = []
# Constructed in the post-process phase
self.objects = {}
with open(file_path, "r") as f:
while True:
line = f.readline()
if not line:
break
if not line.strip() or line.startswith("#"):
continue
parts = line.strip().split(" ")
if parts[0] == "v":
self.on_v(parts[1:])
elif parts[0] == "l":
self.on_l(parts[1:])
elif parts[0] == "f":
self.on_f(parts[1:])
elif parts[0] == "mtllib":
self.mtl_files.mtllib(" ".join(parts[1:]))
elif parts[0] == "usemtl":
self.crt_mtl = " ".join(parts[1:])
elif parts[0] == "o":
self.crt_object = parts[1]
else:
if parts[0] not in self.logged_unsupported:
LOG.warn("Unsupported OBJ directive: " + parts[0])
self.logged_unsupported.add(parts[0])
self.post_process()
def flatten_vertices(self):
return [coord for point in self.vertices for coord in point]
def on_v(self, params):
r, g, b = None, None, None
w = 1.0
if len(params) == 3:
x, y, z = [float(param) for param in params]
if len(params) == 4:
x, y, z, w = [float(param) for param in params]
if len(params) == 6:
x, y, z, r, g, b = [float(param) for param in params]
self.vertices.append((x, z, y))
if r is None or g is None or b is None:
self.vertex_colors.append(None)
else:
self.vertex_colors.append((r, g, b))
def on_l(self, params):
# TODO: handle lines
pass
def on_f(self, params):
indices = []
for param in params:
# TODO: use texture coordinate index / use vertex normal index?
v_index = int(param.split("/")[0])
# If an index is positive then it refers to the offset in that vertex list, starting at 1.
# If an index is negative then it relatively refers to the end of the vertex list, -1 referring to the last element.
if v_index > 0:
v_index -= 1
indices.append(v_index)
self.faces.append(
{"indices": indices, "object": self.crt_object, "mtl": self.crt_mtl}
)
def post_process(self):
# Step 1: group into object_id/material_id/[faces_with_global_indices]
objects = {}
for face in self.faces:
if face["object"] not in objects:
objects[face["object"]] = {}
obj = objects[face["object"]]
if face["mtl"] not in obj:
obj[face["mtl"]] = []
obj[face["mtl"]].append(face["indices"])
# Step 2: construct final structure: object_id / [{material, local_vertices, vertex_colors, faces_with_local_indices}]
for object in objects:
self.objects[object] = []
for mtl in objects[object].keys():
material = self.mtl_files.get_material(mtl)
vertices = []
vertex_colors = []
faces = []
v_global2local_id = {}
for face in objects[object][mtl]:
for global_v in face:
if global_v not in v_global2local_id:
v_global2local_id[global_v] = len(vertices)
vertices.append(self.vertices[global_v])
vertex_colors.append(self.vertex_colors[global_v])
faces.append([v_global2local_id[global_id] for global_id in face])
self.objects[object].append(
{
"material": material,
"vertices": vertices,
"vertex_colors": vertex_colors,
"faces": faces,
}
)
@@ -1,47 +0,0 @@
# Blender MTL File: 'untitled.blend'
# Material Count: 5
newmtl Black
Ns 96.078431
Ka 0.000000 0.000000 0.000000
Kd 0.066825 0.066825 0.066825
Ks 0.500000 0.500000 0.500000
Ni 1.000000
d 1.000000
illum 2
newmtl Material.001
Ns 96.078431
Ka 0.000000 0.000000 0.000000
Kd 0.640000 0.640000 0.640000
Ks 0.500000 0.500000 0.500000
Ni 1.000000
d 1.000000
illum 2
newmtl Material.002
Ns 96.078431
Ka 0.000000 0.000000 0.000000
Kd 0.640000 0.640000 0.640000
Ks 0.500000 0.500000 0.500000
Ni 1.000000
d 1.000000
illum 2
newmtl Material.003
Ns 96.078431
Ka 0.000000 0.000000 0.000000
Kd 0.640000 0.640000 0.640000
Ks 0.500000 0.500000 0.500000
Ni 1.000000
d 1.000000
illum 2
newmtl White
Ns 96.078431
Ka 0.000000 0.000000 0.000000
Kd 0.640000 0.640000 0.640000
Ks 0.500000 0.500000 0.500000
Ni 1.000000
d 1.000000
illum 2
File diff suppressed because it is too large Load Diff
+34 -15
View File
@@ -6,7 +6,8 @@
"author": "Speckle Systems <hello@speckle.systems>",
"homepage": "https://github.com/specklesystems/speckle-server#readme",
"license": "SEE LICENSE IN readme.md",
"main": "daemon.js",
"main": "./bin/www.js",
"type": "module",
"repository": {
"type": "git",
"url": "git+https://github.com/specklesystems/speckle-server.git"
@@ -15,34 +16,52 @@
"node": "^18.19.0"
},
"scripts": {
"dev": "cross-env POSTGRES_URL=postgres://speckle:speckle@127.0.0.1/speckle NODE_ENV=development LOG_PRETTY=true SPECKLE_SERVER_URL=http://127.0.0.1:3000 nodemon --no-experimental-fetch ./src/daemon.js",
"parse:ifc": "node --no-experimental-fetch ./ifc/import_file.js ./ifc/ifcs/steelplates.ifc 33763848d6 2e4bfb467a main File upload: steelplates.ifc",
"lint": "eslint ."
},
"bugs": {
"url": "https://github.com/specklesystems/speckle-server/issues"
"build:tsc:watch": "tsc -p ./tsconfig.build.json --watch",
"run:watch": "NODE_ENV=development LOG_PRETTY=true LOG_LEVEL=debug nodemon --exec \"yarn start\" --trace-deprecation --watch ./bin/www.js --watch ./dist",
"dev": "concurrently \"npm:build:tsc:watch\" \"npm:run:watch\"",
"dev:headed": "yarn dev",
"build:tsc": "rimraf ./dist/src && tsc -p ./tsconfig.build.json",
"build": "yarn build:tsc",
"lint": "yarn lint:tsc && yarn lint:eslint",
"lint:ci": "yarn lint:tsc",
"lint:tsc": "tsc --noEmit",
"lint:eslint": "eslint .",
"start": "node --loader=./dist/src/aliasLoader.js ./bin/www.js",
"test": "NODE_ENV=test LOG_LEVEL=silent LOG_PRETTY=true vitest run --sequence.shuffle"
},
"dependencies": {
"@speckle/shared": "workspace:^",
"bcrypt": "^5.0.1",
"crypto-random-string": "^3.3.1",
"bcrypt": "^5.0.0",
"crypto": "^1.0.1",
"crypto-random-string": "^3.2.0",
"dotenv": "^16.4.5",
"esm-module-alias": "^2.2.0",
"knex": "^2.5.1",
"lodash": "^4.17.21",
"lodash-es": "^4.17.21",
"pg": "^8.7.3",
"pino": "^8.7.0",
"pino-http": "^8.0.0",
"pino-pretty": "^9.1.1",
"prom-client": "^14.0.1",
"tarn": "^3.0.2",
"undici": "^5.28.4",
"valid-filename": "^3.1.0",
"web-ifc": "^0.0.36",
"znv": "^0.4.0",
"zod": "^3.22.4"
"web-ifc": "^0.0.36"
},
"devDependencies": {
"cross-env": "^7.0.3",
"@types/bcrypt": "^5.0.0",
"@types/lodash-es": "^4.17.6",
"@types/node": "^18.19.38",
"@vitest/coverage-istanbul": "^1.6.0",
"concurrently": "^8.2.2",
"eslint": "^9.4.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-vitest": "^0.5.4",
"nodemon": "^2.0.20",
"prettier": "^2.5.1"
"prettier": "^2.5.1",
"rimraf": "^5.0.7",
"typescript": "^4.6.4",
"typescript-eslint": "^7.12.0",
"vitest": "^1.6.0"
}
}
@@ -0,0 +1,6 @@
import generateAliasesResolver from 'esm-module-alias'
import { srcRoot } from './root.js'
export const resolve = generateAliasesResolver({
'@': srcRoot
})
+9
View File
@@ -0,0 +1,9 @@
import '@/bootstrap.js' // This has side-effects and has to be imported first
import { main } from '@/src/daemon.js'
const start = () => {
void main()
}
start()
@@ -0,0 +1,2 @@
import dotenv from 'dotenv'
dotenv.config()
@@ -1,32 +0,0 @@
/* istanbul ignore file */
'use strict'
const fs = require('fs')
const path = require('node:path')
const { stream, fetch } = require('undici')
module.exports = {
async downloadFile({ fileId, streamId, token, destination }) {
fs.mkdirSync(path.dirname(destination), { recursive: true })
await stream(
`${process.env.SPECKLE_SERVER_URL}/api/stream/${streamId}/blob/${fileId}`,
{
opaque: fs.createWriteStream(destination),
headers: {
Authorization: `Bearer ${token}`
}
},
({ opaque }) => opaque
)
},
async getFileInfoByName({ fileName, streamId, token }) {
const response = await fetch(
`${process.env.SPECKLE_SERVER_URL}/api/stream/${streamId}/blobs?fileName=${fileName}`,
{
headers: {
Authorization: `Bearer ${token}`
}
}
)
return response.json()
}
}
@@ -4,9 +4,7 @@
"tools": {
"csharpier": {
"version": "0.30.1",
"commands": [
"dotnet-csharpier"
]
"commands": ["dotnet-csharpier"]
}
}
}
}
@@ -1,18 +1,18 @@
'use strict'
const Environment = require('@speckle/shared/dist/commonjs/environment/index.js')
const {
import Environment from '@speckle/shared/dist/commonjs/environment/index.js'
import {
loadMultiRegionsConfig,
configureKnexClient
} = require('@speckle/shared/dist/commonjs/environment/multiRegionConfig.js')
const { logger } = require('./observability/logging')
} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js'
import { logger } from '@/observability/logging.js'
import { Knex } from 'knex'
const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()
const isDevEnv = process.env.NODE_ENV !== 'production'
let dbClients
const getDbClients = async () => {
type DbClient = { public: Knex; private?: Knex }
let dbClients: { [key: string]: DbClient }
export const getDbClients = async () => {
if (dbClients) return dbClients
const maxConnections = parseInt(
process.env['POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE'] || '1'
@@ -49,7 +49,10 @@ const getDbClients = async () => {
} else {
const configPath = process.env.MULTI_REGION_CONFIG_PATH || 'multiregion.json'
const config = await loadMultiRegionsConfig({ path: configPath })
const clients = [['main', configureKnexClient(config.main, configArgs)]]
const clients: [string, DbClient][] = [
['main', configureKnexClient(config.main, configArgs)]
]
Object.entries(config.regions).map(([key, config]) => {
clients.push([key, configureKnexClient(config, configArgs)])
})
@@ -57,5 +60,3 @@ const getDbClients = async () => {
}
return dbClients
}
module.exports = getDbClients
@@ -1,59 +0,0 @@
'use strict'
const events = require('events')
const fs = require('fs')
const readline = require('readline')
const path = require('path')
const { downloadFile, getFileInfoByName } = require('./filesApi')
const isValidFilename = require('valid-filename')
const { logger } = require('../observability/logging')
const getReferencedMtlFiles = async ({ objFilePath }) => {
const mtlFiles = []
try {
const rl = readline.createInterface({
input: fs.createReadStream(objFilePath),
crlfDelay: Infinity
})
rl.on('line', (line) => {
if (line.startsWith('mtllib ')) {
const mtlFile = line.slice('mtllib '.length).trim()
mtlFiles.push(mtlFile)
}
})
await events.once(rl, 'close')
} catch (err) {
logger.error(err, `Error getting dependencies for file ${objFilePath}`)
}
return mtlFiles
}
module.exports = {
async downloadDependencies({ objFilePath, streamId, destinationDir, token }) {
const dependencies = await getReferencedMtlFiles({ objFilePath })
logger.info(`Obj file depends on ${dependencies}`)
for (const mtlFile of dependencies) {
// there might be multiple files named with the same name, take the first...
const [file] = (await getFileInfoByName({ fileName: mtlFile, streamId, token }))
.blobs
if (!file) {
logger.info(`OBJ dependency file not found in stream: ${mtlFile}`)
continue
}
if (!isValidFilename(mtlFile)) {
logger.warn(`Invalid filename reference in OBJ dependencies: ${mtlFile}`)
continue
}
await downloadFile({
fileId: file.id,
streamId,
token,
destination: path.join(destinationDir, mtlFile)
})
}
}
}
@@ -1,14 +1,10 @@
const Observability = require('@speckle/shared/dist/commonjs/observability/index.js')
import Observability from '@speckle/shared/dist/commonjs/observability/index.js'
// loggers for specific components within normal operation
const logger = Observability.extendLoggerComponent(
export const logger = Observability.extendLoggerComponent(
Observability.getLogger(
process.env.LOG_LEVEL || 'info',
process.env.LOG_PRETTY === 'true'
),
'fileimport-service'
)
module.exports = {
logger
}
@@ -1,166 +0,0 @@
/* eslint-disable no-unused-vars */
'use strict'
const http = require('http')
const prometheusClient = require('prom-client')
const getDbClients = require('../knex')
let metricFree = null
let metricUsed = null
let metricPendingAquires = null
let metricPendingCreates = null
let metricPendingValidations = null
let metricRemainingCapacity = null
let metricQueryDuration = null
let metricQueryErrors = null
const queryStartTime = {}
prometheusClient.register.clear()
prometheusClient.register.setDefaultLabels({
project: 'speckle-server',
app: 'fileimport-service'
})
prometheusClient.collectDefaultMetrics()
let prometheusInitialized = false
const initDBPrometheusMetricsFactory =
({ db }) =>
() => {
metricFree = new prometheusClient.Gauge({
name: 'speckle_server_knex_free',
help: 'Number of free DB connections',
collect() {
this.set(db.client.pool.numFree())
}
})
metricUsed = new prometheusClient.Gauge({
name: 'speckle_server_knex_used',
help: 'Number of used DB connections',
collect() {
this.set(db.client.pool.numUsed())
}
})
metricPendingAquires = new prometheusClient.Gauge({
name: 'speckle_server_knex_pending',
help: 'Number of pending DB connection aquires',
collect() {
this.set(db.client.pool.numPendingAcquires())
}
})
metricPendingCreates = new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_creates',
help: 'Number of pending DB connection creates',
collect() {
this.set(db.client.pool.numPendingCreates())
}
})
metricPendingValidations = new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_validations',
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
collect() {
this.set(db.client.pool.numPendingValidations())
}
})
metricRemainingCapacity = new prometheusClient.Gauge({
name: 'speckle_server_knex_remaining_capacity',
help: 'Remaining capacity of the DB connection pool',
collect() {
const postgresMaxConnections =
parseInt(process.env.POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE) || 1
const demand =
db.client.pool.numUsed() +
db.client.pool.numPendingCreates() +
db.client.pool.numPendingValidations() +
db.client.pool.numPendingAcquires()
this.set(Math.max(postgresMaxConnections - demand, 0))
}
})
metricQueryDuration = new prometheusClient.Summary({
name: 'speckle_server_knex_query_duration',
help: 'Summary of the DB query durations in seconds'
})
metricQueryErrors = new prometheusClient.Counter({
name: 'speckle_server_knex_query_errors',
help: 'Number of DB queries with errors'
})
db.on('query', (data) => {
const queryId = data.__knexQueryUid + ''
queryStartTime[queryId] = Date.now()
})
db.on('query-response', (data, obj, builder) => {
const queryId = obj.__knexQueryUid + ''
const durationSec = (Date.now() - queryStartTime[queryId]) / 1000
delete queryStartTime[queryId]
if (!isNaN(durationSec)) metricQueryDuration.observe(durationSec)
})
db.on('query-error', (err, querySpec) => {
const queryId = querySpec.__knexQueryUid + ''
const durationSec = (Date.now() - queryStartTime[queryId]) / 1000
delete queryStartTime[queryId]
if (!isNaN(durationSec)) metricQueryDuration.observe(durationSec)
metricQueryErrors.inc()
})
}
module.exports = {
async initPrometheusMetrics() {
if (prometheusInitialized) return
prometheusInitialized = true
const db = (await getDbClients()).main.public
initDBPrometheusMetricsFactory({ db })()
// Define the HTTP server
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
res.setHeader('Content-Type', prometheusClient.register.contentType)
res.end(await prometheusClient.register.metrics())
} else {
res.end('Speckle FileImport Service - prometheus metrics')
}
})
server.listen(Number(process.env.PROMETHEUS_METRICS_PORT) || 9093)
},
metricDuration: new prometheusClient.Histogram({
name: 'speckle_server_operation_duration',
help: 'Summary of the operation durations in seconds',
buckets: [0.5, 1, 5, 10, 30, 60, 300, 600, 900, 1200],
labelNames: ['op']
}),
metricOperationErrors: new prometheusClient.Counter({
name: 'speckle_server_operation_errors',
help: 'Number of operations with errors',
labelNames: ['op']
}),
metricInputFileSize: new prometheusClient.Histogram({
name: 'speckle_server_operation_file_size',
help: 'Size of the operation input file size',
buckets: [
1000,
100 * 1000,
500 * 1000,
1000 * 1000,
5 * 1000 * 1000,
10 * 1000 * 1000,
100 * 1000 * 1000
],
labelNames: ['op']
})
}
+21
View File
@@ -0,0 +1,21 @@
import path from 'node:path'
import fs from 'node:fs'
import { fileURLToPath } from 'url'
/**
* Singleton module for src root and package root directory resolution
*/
const __filename = fileURLToPath(import.meta.url)
const srcRoot = path.dirname(__filename)
// Recursively walk back from __dirname till we find our package.json
let packageRoot = srcRoot
while (packageRoot !== '/') {
if (fs.readdirSync(packageRoot).includes('package.json')) {
break
}
packageRoot = path.resolve(packageRoot, '..')
}
export { srcRoot, packageRoot }
@@ -1,23 +1,57 @@
'use strict'
const crypto = require('crypto')
const crs = require('crypto-random-string')
const bcrypt = require('bcrypt')
const { chunk } = require('lodash')
const { logger: parentLogger } = require('../observability/logging')
import crypto from 'crypto'
import crs from 'crypto-random-string'
import bcrypt from 'bcrypt'
import { chunk } from 'lodash'
import { logger as parentLogger } from '@/observability/logging.js'
import Observability from '@speckle/shared/dist/commonjs/observability/index.js'
import type { Knex } from 'knex'
import type { Logger } from 'pino'
import { ForceRequired } from '@speckle/shared/dist/commonjs/index.js'
const Observability = require('@speckle/shared/dist/commonjs/observability/index.js')
const tables = (db) => ({
const tables = (db: Knex) => ({
objects: db('objects'),
closures: db('object_children_closure'),
branches: db('branches'),
branches: db<{
id: string
streamId: string
authorId: string
name: string
description: string
}>('branches'),
streams: db('streams'),
apiTokens: db('api_tokens'),
tokenScopes: db('token_scopes')
})
module.exports = class ServerAPI {
constructor({ db, streamId, logger }) {
type SpeckleObject = {
id?: string
hash?: string
streamId: string
__closure?: Record<string, number>
__tree?: unknown
speckleType: string
totalChildrenCount?: number
totalChildrenCountByDepth?: string
data: unknown
}
export class ServerAPI {
tables: ReturnType<typeof tables>
db: Knex
streamId: string
isSending: boolean
buffer: unknown[]
logger: Logger
constructor({
db,
streamId,
logger
}: {
db: Knex
streamId: string
logger: Logger
}) {
this.tables = tables(db)
this.db = db
this.streamId = streamId
@@ -28,7 +62,7 @@ module.exports = class ServerAPI {
Observability.extendLoggerComponent(parentLogger.child({ streamId }), 'ifc')
}
async saveObject(obj) {
async saveObject(obj: SpeckleObject) {
if (!obj) throw new Error('Null object')
if (!obj.id) {
@@ -40,15 +74,21 @@ module.exports = class ServerAPI {
return obj.id
}
async saveObjectBatch(objs) {
async saveObjectBatch(objs: SpeckleObject[]) {
return await this.createObjectsBatched(this.streamId, objs)
}
async createObject({ streamId, object }) {
async createObject({
streamId,
object
}: {
streamId: string
object: SpeckleObject
}) {
const insertionObject = this.prepInsertionObject(streamId, object)
const closures = []
const totalChildrenCountByDepth = {}
const totalChildrenCountByDepth: Record<string, number> = {}
if (object.__closure !== null) {
for (const prop in object.__closure) {
closures.push({
@@ -81,16 +121,21 @@ module.exports = class ServerAPI {
return insertionObject.id
}
async createObjectsBatched(streamId, objects) {
const closures = []
const objsToInsert = []
const ids = []
async createObjectsBatched(streamId: string, objects: SpeckleObject[]) {
const closures: {
streamId: string
parent: string | undefined
child: string | undefined
minDepth: number
}[] = []
const objsToInsert: ForceRequired<SpeckleObject, 'id'>[] = []
const ids: string[] = []
// Prep objects up
objects.forEach((obj) => {
const insertionObject = this.prepInsertionObject(streamId, obj)
let totalChildrenCountGlobal = 0
const totalChildrenCountByDepth = {}
const totalChildrenCountByDepth: Record<string, number> = {}
if (obj.__closure !== null) {
for (const prop in obj.__closure) {
@@ -159,7 +204,10 @@ module.exports = class ServerAPI {
return ids
}
prepInsertionObject(streamId, obj) {
prepInsertionObject(
streamId: string,
obj: SpeckleObject
): ForceRequired<SpeckleObject, 'id'> {
const maximumObjectSizeMB = parseInt(process.env['MAX_OBJECT_SIZE_MB'] || '10')
const MAX_OBJECT_SIZE = maximumObjectSizeMB * 1024 * 1024
@@ -183,23 +231,31 @@ module.exports = class ServerAPI {
}
}
prepInsertionObjectBatch(batch) {
prepInsertionObjectBatch(batch: Array<{ id: string }>) {
batch.sort((a, b) => (a.id > b.id ? 1 : -1))
}
prepInsertionClosureBatch(batch) {
prepInsertionClosureBatch(
batch: Array<{ parent: string | undefined; child: string | undefined }>
) {
batch.sort((a, b) =>
a.parent > b.parent
a.parent && b.parent && a.parent > b.parent
? 1
: a.parent === b.parent
? a.child > b.child
? a.child && b.child && a.child > b.child
? 1
: -1
: -1
)
}
async getBranchByNameAndStreamId({ streamId, name }) {
async getBranchByNameAndStreamId({
streamId,
name
}: {
streamId: string
name: string
}) {
const query = this.tables.branches
.select('*')
.where({ streamId })
@@ -208,13 +264,24 @@ module.exports = class ServerAPI {
return await query
}
async createBranch({ name, description, streamId, authorId }) {
const branch = {}
branch.id = crs({ length: 10 })
branch.streamId = streamId
branch.authorId = authorId
branch.name = name.toLowerCase()
branch.description = description
async createBranch({
name,
description,
streamId,
authorId
}: {
name: string
description: string
streamId: string
authorId: string
}) {
const branch = {
id: crs({ length: 10 }),
streamId,
authorId,
name: name.toLowerCase(),
description
}
await this.tables.branches.returning('id').insert(branch)
@@ -235,7 +302,17 @@ module.exports = class ServerAPI {
return { tokenId, tokenString, tokenHash, lastChars }
}
async createToken({ userId, name, scopes, lifespan }) {
async createToken({
userId,
name,
scopes,
lifespan
}: {
userId: string
name: string
scopes: string[]
lifespan: number
}) {
const { tokenId, tokenString, tokenHash, lastChars } = await this.createBareToken()
if (scopes.length === 0) throw new Error('No scopes provided')
@@ -256,7 +333,7 @@ module.exports = class ServerAPI {
return { id: tokenId, token: tokenId + tokenString }
}
async revokeTokenById(tokenId) {
async revokeTokenById(tokenId: string) {
const delCount = await this.tables.apiTokens
.where({ id: tokenId.slice(0, 10) })
.del()
@@ -1,22 +1,23 @@
'use strict'
const Environment = require('@speckle/shared/dist/commonjs/environment/index.js')
const {
import Environment from '@speckle/shared/dist/commonjs/environment/index.js'
import {
initPrometheusMetrics,
metricDuration,
metricInputFileSize,
metricOperationErrors
} = require('./prometheusMetrics')
const getDbClients = require('../knex')
} from '@/src/prometheusMetrics.js'
import { getDbClients } from '@/knex.js'
const { downloadFile } = require('./filesApi')
const fs = require('fs')
const { spawn } = require('child_process')
import { downloadFile } from '@/src/filesApi.js'
import fs from 'fs'
import { spawn } from 'child_process'
import { ServerAPI } from '@/src/api.js'
import { downloadDependencies } from '@/src/objDependencies.js'
import { logger } from '@/observability/logging.js'
import { Nullable, Scopes, wait } from '@speckle/shared'
import { Knex } from 'knex'
import { Logger } from 'pino'
const ServerAPI = require('./api')
const objDependencies = require('./objDependencies')
const { logger } = require('../observability/logging')
const { Scopes, wait } = require('@speckle/shared')
const { FF_FILEIMPORT_IFC_DOTNET_ENABLED } = Environment.getFeatureFlags()
const HEALTHCHECK_FILE_PATH = '/tmp/last_successful_query'
@@ -29,11 +30,11 @@ let shouldExit = false
let TIME_LIMIT = 10 * 60 * 1000
const providedTimeLimit = parseInt(process.env.FILE_IMPORT_TIME_LIMIT_MIN)
const providedTimeLimit = parseInt(process.env['FILE_IMPORT_TIME_LIMIT_MIN'] || '10')
if (providedTimeLimit) TIME_LIMIT = providedTimeLimit * 60 * 1000
async function startTask(knex) {
const { rows } = await knex.raw(`
async function startTask(knex: Knex) {
const { rows } = (await knex.raw(`
UPDATE file_uploads
SET
"convertedStatus" = 1,
@@ -46,18 +47,23 @@ async function startTask(knex) {
) as task
WHERE file_uploads."id" = task."id"
RETURNING file_uploads."id"
`)
`)) satisfies { rows: { id: string }[] }
return rows[0]
}
async function doTask(mainDb, regionName, taskDb, task) {
async function doTask(
mainDb: Knex,
regionName: string,
taskDb: Knex,
task: { id: string }
) {
const taskId = task.id
// Mark task as started
await mainDb.raw(`NOTIFY file_import_started, '${task.id}'`)
let taskLogger = logger.child({ taskId })
let tempUserToken = null
let tempUserToken: Nullable<string> = null
let mainServerApi = null
let taskServerApi = null
let fileTypeForMetric = 'unknown'
@@ -65,11 +71,24 @@ async function doTask(mainDb, regionName, taskDb, task) {
const metricDurationEnd = metricDuration.startTimer()
let newBranchCreated = false
let branchMetadata = { streamId: null, branchName: null }
let branchMetadata: { streamId: Nullable<string>; branchName: Nullable<string> } = {
streamId: null,
branchName: null
}
try {
taskLogger.info("Doing task '{taskId}'.")
const info = await taskDb('file_uploads').where({ id: taskId }).first()
const info = await taskDb<{
id: string
fileType: string
fileSize: string
fileName: string
userId: string
streamId: string
branchName: string
}>('file_uploads')
.where({ id: taskId })
.first()
if (!info) {
throw new Error('Internal error: DB inconsistent')
}
@@ -192,7 +211,7 @@ async function doTask(mainDb, regionName, taskDb, task) {
TIME_LIMIT
)
} else if (info.fileType.toLowerCase() === 'obj') {
await objDependencies.downloadDependencies({
await downloadDependencies({
objFilePath: TMP_FILE_PATH,
streamId: info.streamId,
destinationDir: TMP_INPUT_DIR,
@@ -224,9 +243,24 @@ async function doTask(mainDb, regionName, taskDb, task) {
throw new Error(`File type ${info.fileType} is not supported`)
}
const output = JSON.parse(fs.readFileSync(TMP_RESULTS_PATH))
const output: unknown = JSON.parse(fs.readFileSync(TMP_RESULTS_PATH, 'utf8'))
if (!output.success) throw new Error(output.error)
if (
!output ||
typeof output !== 'object' ||
!('success' in output) ||
!output.success ||
!('commitId' in output)
)
throw new Error(
output &&
typeof output === 'object' &&
'error' in output &&
output.error &&
typeof output.error === 'string'
? output.error
: 'Unknown error'
)
const commitId = output.commitId
@@ -243,7 +277,11 @@ async function doTask(mainDb, regionName, taskDb, task) {
[commitId, task.id]
)
} catch (err) {
taskLogger.error(err)
taskLogger.error(err, 'Error processing task')
const errorForDatabase =
err && (typeof err === 'string' || err instanceof Error)
? err.toString()
: 'Unknown error'
await taskDb.raw(
`
UPDATE file_uploads
@@ -254,7 +292,7 @@ async function doTask(mainDb, regionName, taskDb, task) {
WHERE "id" = ?
`,
// DB only accepts a varchar 255
[err.toString().substring(0, 254), task.id]
[errorForDatabase.substring(0, 254), task.id]
)
metricOperationErrors.labels(fileTypeForMetric).inc()
} finally {
@@ -271,12 +309,18 @@ async function doTask(mainDb, regionName, taskDb, task) {
fs.rmSync(TMP_INPUT_DIR, { force: true, recursive: true })
if (fs.existsSync(TMP_RESULTS_PATH)) fs.unlinkSync(TMP_RESULTS_PATH)
if (tempUserToken) {
if (mainServerApi && tempUserToken) {
await mainServerApi.revokeTokenById(tempUserToken)
}
}
function runProcessWithTimeout(processLogger, cmd, cmdArgs, extraEnv, timeoutMs) {
function runProcessWithTimeout(
processLogger: Logger,
cmd: string,
cmdArgs: string[],
extraEnv: Record<string, string>,
timeoutMs: number
): Promise<void> {
return new Promise((resolve, reject) => {
let boundLogger = processLogger.child({ cmd, args: cmdArgs })
boundLogger.info('Starting process.')
@@ -325,14 +369,14 @@ function runProcessWithTimeout(processLogger, cmd, cmdArgs, extraEnv, timeoutMs)
})
}
function handleData(data, isErr, logger) {
function handleData(data: unknown, isErr: boolean, logger: Logger) {
try {
Buffer.isBuffer(data) && (data = data.toString())
data.split('\n').forEach((line) => {
if (!Buffer.isBuffer(data)) return
const dataAsString = data.toString()
dataAsString.split('\n').forEach((line) => {
if (!line) return
try {
JSON.parse(line) // verify if the data is already in JSON format
process.stdout.write(line)
process.stdout.write('\n')
} catch {
wrapLogLine(line, isErr, logger)
@@ -343,7 +387,7 @@ function handleData(data, isErr, logger) {
}
}
function wrapLogLine(line, isErr, logger) {
function wrapLogLine(line: string, isErr: boolean, logger: Logger) {
if (isErr) {
logger.error({ parserLogLine: line }, 'ParserLog: {parserLogLine}')
return
@@ -356,7 +400,7 @@ const doStuff = async () => {
const mainDb = dbClients.main.public
const dbClientsIterator = infiniteDbClientsIterator(dbClients)
while (!shouldExit) {
const [regionName, taskDb] = dbClientsIterator.next().value
const [regionName, taskDb]: [string, Knex] = dbClientsIterator.next().value
try {
const task = await startTask(taskDb)
fs.writeFile(HEALTHCHECK_FILE_PATH, '' + Date.now(), () => {})
@@ -374,7 +418,7 @@ const doStuff = async () => {
}
}
async function main() {
export async function main() {
logger.info('Starting FileUploads Service...')
await initPrometheusMetrics()
@@ -387,7 +431,9 @@ async function main() {
process.exit(0)
}
function* infiniteDbClientsIterator(dbClients) {
function* infiniteDbClientsIterator(dbClients: {
[key: string]: { public: Knex }
}): Generator<[string, Knex], [string, Knex], [string, Knex]> {
let index = 0
const dbClientEntries = [...Object.entries(dbClients)]
const clientCount = dbClientEntries.length
@@ -399,5 +445,3 @@ function* infiniteDbClientsIterator(dbClients) {
yield [regionName, dbConnection.public]
}
}
main()
@@ -0,0 +1,48 @@
import fs from 'fs'
import path from 'node:path'
import { Writable } from 'node:stream'
import { stream, fetch } from 'undici'
export async function downloadFile({
fileId,
streamId,
token,
destination
}: {
fileId: string
streamId: string
token: string
destination: string
}) {
fs.mkdirSync(path.dirname(destination), { recursive: true })
await stream(
`${process.env.SPECKLE_SERVER_URL}/api/stream/${streamId}/blob/${fileId}`,
{
method: 'POST',
opaque: fs.createWriteStream(destination),
headers: {
Authorization: `Bearer ${token}`
}
},
({ opaque }) => opaque as Writable //FIXME
)
}
export async function getFileInfoByName({
fileName,
streamId,
token
}: {
fileName: string
streamId: string
token: string
}) {
const response = await fetch(
`${process.env.SPECKLE_SERVER_URL}/api/stream/${streamId}/blobs?fileName=${fileName}`,
{
headers: {
Authorization: `Bearer ${token}`
}
}
)
return response.json() as Promise<{ blobs: { id: string }[] }>
}
@@ -0,0 +1,66 @@
import events from 'events'
import fs from 'fs'
import readline from 'readline'
import path from 'path'
import isValidFilename from 'valid-filename'
import { downloadFile, getFileInfoByName } from '@/src/filesApi.js'
import { logger } from '@/observability/logging.js'
const getReferencedMtlFiles = async ({ objFilePath }: { objFilePath: string }) => {
const mtlFiles: string[] = []
try {
const rl = readline.createInterface({
input: fs.createReadStream(objFilePath),
crlfDelay: Infinity
})
rl.on('line', (line) => {
if (line.startsWith('mtllib ')) {
const mtlFile = line.slice('mtllib '.length).trim()
mtlFiles.push(mtlFile)
}
})
await events.once(rl, 'close')
} catch (err) {
logger.error(err, `Error getting dependencies for file ${objFilePath}`)
}
return mtlFiles
}
export async function downloadDependencies({
objFilePath,
streamId,
destinationDir,
token
}: {
objFilePath: string
streamId: string
destinationDir: string
token: string
}) {
const dependencies = await getReferencedMtlFiles({ objFilePath })
logger.info(`Obj file depends on ${dependencies.join(', ')}`)
for (const mtlFile of dependencies) {
// there might be multiple files named with the same name, take the first...
const [file] = (await getFileInfoByName({ fileName: mtlFile, streamId, token }))
.blobs
if (!file) {
logger.info(`OBJ dependency file not found in stream: ${mtlFile}`)
continue
}
if (!isValidFilename(mtlFile)) {
logger.warn(`Invalid filename reference in OBJ dependencies: ${mtlFile}`)
continue
}
await downloadFile({
fileId: file.id,
streamId,
token,
destination: path.join(destinationDir, mtlFile)
})
}
}
@@ -0,0 +1,169 @@
import http from 'http'
import prometheusClient, { Counter, Summary } from 'prom-client'
import { getDbClients } from '@/knex.js'
import { Knex } from 'knex'
import { Pool } from 'tarn'
import { isObject } from 'lodash'
import { IncomingMessage } from 'http'
let metricQueryDuration: Summary<string> | null = null
let metricQueryErrors: Counter<string> | null = null
const queryStartTime: Record<string, number> = {}
prometheusClient.register.clear()
prometheusClient.register.setDefaultLabels({
project: 'speckle-server',
app: 'fileimport-service'
})
prometheusClient.collectDefaultMetrics()
let prometheusInitialized = false
const initDBPrometheusMetricsFactory =
({ db }: { db: Knex }) =>
() => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const dbConnectionPool = db.client.pool as Pool<unknown>
new prometheusClient.Gauge({
name: 'speckle_server_knex_free',
help: 'Number of free DB connections',
collect() {
this.set(dbConnectionPool.numFree())
}
})
new prometheusClient.Gauge({
name: 'speckle_server_knex_used',
help: 'Number of used DB connections',
collect() {
this.set(dbConnectionPool.numUsed())
}
})
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending',
help: 'Number of pending DB connection aquires',
collect() {
this.set(dbConnectionPool.numPendingAcquires())
}
})
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_creates',
help: 'Number of pending DB connection creates',
collect() {
this.set(dbConnectionPool.numPendingCreates())
}
})
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_validations',
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
collect() {
this.set(dbConnectionPool.numPendingValidations())
}
})
new prometheusClient.Gauge({
name: 'speckle_server_knex_remaining_capacity',
help: 'Remaining capacity of the DB connection pool',
collect() {
const postgresMaxConnections = parseInt(
process.env['POSTGRES_MAX_CONNECTIONS_FILE_IMPORT_SERVICE'] || '1'
)
const demand =
dbConnectionPool.numUsed() +
dbConnectionPool.numPendingCreates() +
dbConnectionPool.numPendingValidations() +
dbConnectionPool.numPendingAcquires()
this.set(Math.max(postgresMaxConnections - demand, 0))
}
})
metricQueryDuration = new prometheusClient.Summary({
name: 'speckle_server_knex_query_duration',
help: 'Summary of the DB query durations in seconds'
})
metricQueryErrors = new prometheusClient.Counter({
name: 'speckle_server_knex_query_errors',
help: 'Number of DB queries with errors'
})
db.on('query', (data) => {
if (!isObject(data) || !('__knexQueryUid' in data)) return
const queryId = String(data.__knexQueryUid)
queryStartTime[queryId] = Date.now()
})
db.on('query-response', (_data, obj) => {
if (!isObject(obj) || !('__knexQueryUid' in obj)) return
const queryId = String(obj.__knexQueryUid)
const durationSec = (Date.now() - queryStartTime[queryId]) / 1000
delete queryStartTime[queryId]
if (metricQueryDuration && !isNaN(durationSec))
metricQueryDuration.observe(durationSec)
})
db.on('query-error', (_err, querySpec) => {
if (!isObject(querySpec) || !('__knexQueryUid' in querySpec)) return
const queryId = String(querySpec.__knexQueryUid)
const durationSec = (Date.now() - queryStartTime[queryId]) / 1000
delete queryStartTime[queryId]
if (metricQueryDuration && !isNaN(durationSec))
metricQueryDuration.observe(durationSec)
if (metricQueryErrors) metricQueryErrors.inc()
})
}
export async function initPrometheusMetrics() {
if (prometheusInitialized) return
prometheusInitialized = true
const db = (await getDbClients()).main.public
initDBPrometheusMetricsFactory({ db })()
const requestHandler = (req: IncomingMessage, res: http.OutgoingMessage) => {
if (req.url === '/metrics') {
res.setHeader('Content-Type', prometheusClient.register.contentType)
res.end(Promise.resolve(prometheusClient.register.metrics()))
} else {
res.end('Speckle FileImport Service - prometheus metrics')
}
}
// Define the HTTP server
const server = http.createServer(requestHandler)
server.listen(Number(process.env.PROMETHEUS_METRICS_PORT) || 9093)
}
export const metricDuration = new prometheusClient.Histogram({
name: 'speckle_server_operation_duration',
help: 'Summary of the operation durations in seconds',
buckets: [0.5, 1, 5, 10, 30, 60, 300, 600, 900, 1200],
labelNames: ['op']
})
export const metricOperationErrors = new prometheusClient.Counter({
name: 'speckle_server_operation_errors',
help: 'Number of operations with errors',
labelNames: ['op']
})
export const metricInputFileSize = new prometheusClient.Histogram({
name: 'speckle_server_operation_file_size',
help: 'Size of the operation input file size',
buckets: [
1000,
100 * 1000,
500 * 1000,
1000 * 1000,
5 * 1000 * 1000,
10 * 1000 * 1000,
100 * 1000 * 1000
],
labelNames: ['op']
})
@@ -0,0 +1,5 @@
{
"extends": "./tsconfig.json",
"include": ["src/**/*"],
"exclude": ["**/*.spec.js", "**/*.spec.ts"]
}
+108
View File
@@ -0,0 +1,108 @@
{
"compilerOptions": {
/* Visit https://aka.ms/tsconfig.json to read more about this file */
/* Projects */
// "incremental": true, /* Enable incremental compilation */
// "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */
// "tsBuildInfoFile": "./", /* Specify the folder for .tsbuildinfo incremental compilation files. */
// "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects */
// "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */
// "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */
/* Language and Environment */
"target": "ES2022" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */,
// "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */
// "jsx": "preserve", /* Specify what JSX code is generated. */
// "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */
// "emitDecoratorMetadata": true, /* Emit design-type metadata for decorated declarations in source files. */
// "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h' */
// "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */
// "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using `jsx: react-jsx*`.` */
// "reactNamespace": "", /* Specify the object invoked for `createElement`. This only applies when targeting `react` JSX emit. */
// "noLib": true, /* Disable including any library files, including the default lib.d.ts. */
// "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */
/* Modules */
"module": "node16" /* Specify what module code is generated. */,
"rootDir": "./" /* Specify the root folder within your source files. */,
"moduleResolution": "node16" /* Specify how TypeScript looks up a file from a given module specifier. */,
"baseUrl": "./" /* Specify the base directory to resolve non-relative module names. */,
"paths": {
"@/*": ["./src/*"]
},
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
// "typeRoots": [], /* Specify multiple folders that act like `./node_modules/@types`. */
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
// "resolveJsonModule": true, /* Enable importing .json files */
// "noResolve": true, /* Disallow `import`s, `require`s or `<reference>`s from expanding the number of files TypeScript should add to a project. */
/* JavaScript Support */
"allowJs": true /* Allow JavaScript files to be a part of your program. Use the `checkJS` option to get errors from these files. */,
"checkJs": false /* Enable error reporting in type-checked JavaScript files. */,
// "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from `node_modules`. Only applicable with `allowJs`. */
/* Emit */
// "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */
// "declarationMap": true, /* Create sourcemaps for d.ts files. */
// "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */
"sourceMap": true /* Create source map files for emitted JavaScript files. */,
// "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If `declaration` is true, also designates a file that bundles all .d.ts output. */
"outDir": "./dist" /* Specify an output folder for all emitted files. */,
// "removeComments": true, /* Disable emitting comments. */
// "noEmit": true, /* Disable emitting files from a compilation. */
// "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */
// "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types */
// "downlevelIteration": true, /* Emit more compliant, but verbose and less performant JavaScript for iteration. */
// "sourceRoot": "", /* Specify the root path for debuggers to find the reference source code. */
// "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */
// "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */
// "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */
// "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */
// "newLine": "crlf", /* Set the newline character for emitting files. */
// "stripInternal": true, /* Disable emitting declarations that have `@internal` in their JSDoc comments. */
// "noEmitHelpers": true, /* Disable generating custom helper functions like `__extends` in compiled output. */
// "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */
// "preserveConstEnums": true, /* Disable erasing `const enum` declarations in generated code. */
// "declarationDir": "./", /* Specify the output directory for generated declaration files. */
// "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */
/* Interop Constraints */
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
"allowSyntheticDefaultImports": true /* Allow 'import x from y' when a module doesn't have a default export. */,
"esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables `allowSyntheticDefaultImports` for type compatibility. */,
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */
"forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */,
/* Type Checking */
"strict": true /* Enable all strict type-checking options. */,
// "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied `any` type.. */
// "strictNullChecks": true, /* When type checking, take into account `null` and `undefined`. */
// "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */
// "strictBindCallApply": true, /* Check that the arguments for `bind`, `call`, and `apply` methods match the original function. */
// "strictPropertyInitialization": true, /* Check for class properties that are declared but not set in the constructor. */
// "noImplicitThis": true, /* Enable error reporting when `this` is given the type `any`. */
// "useUnknownInCatchVariables": true, /* Type catch clause variables as 'unknown' instead of 'any'. */
// "alwaysStrict": true, /* Ensure 'use strict' is always emitted. */
// "noUnusedLocals": true, /* Enable error reporting when a local variables aren't read. */
// "noUnusedParameters": true, /* Raise an error when a function parameter isn't read */
// "exactOptionalPropertyTypes": true, /* Interpret optional property types as written, rather than adding 'undefined'. */
// "noImplicitReturns": true, /* Enable error reporting for codepaths that do not explicitly return in a function. */
// "noFallthroughCasesInSwitch": true, /* Enable error reporting for fallthrough cases in switch statements. */
// "noUncheckedIndexedAccess": true, /* Include 'undefined' in index signature results */
// "noImplicitOverride": true, /* Ensure overriding members in derived classes are marked with an override modifier. */
// "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type */
// "allowUnusedLabels": true, /* Disable error reporting for unused labels. */
// "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */
/* Completeness */
// "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
},
"ts-node": {
"swc": true
},
"include": ["src/**/*", "vitest.config.ts"],
"exclude": ["node_modules", "coverage", "reports"]
}
@@ -0,0 +1,18 @@
import path from 'path'
import { configDefaults, defineConfig } from 'vitest/config'
export default defineConfig({
test: {
exclude: [...configDefaults.exclude],
// reporters: ['verbose', 'hanging-process'] //uncomment to debug hanging processes etc.
sequence: {
shuffle: true,
concurrent: true
}
},
resolve: {
alias: {
'@': path.resolve(__dirname, './src')
}
}
})
@@ -6,9 +6,6 @@ import type { Counter, Histogram, Summary } from 'prom-client'
import prometheusClient from 'prom-client'
import { Pool } from 'tarn'
// let metricFree: Gauge<string> | null = null
// let metricUsed: Gauge<string> = null
// let metricPendingAquires: Gauge<string> | null = null
let metricQueryDuration: Summary<string> | null = null
let metricQueryErrors: Counter<string> | null = null
export let metricDuration: Histogram<string> | null = null
@@ -30,7 +27,7 @@ function initKnexPrometheusMetrics(params: { db: Knex }) {
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const dbConnectionPool = db.client.pool as Pool<unknown>
//metricFree =
new prometheusClient.Gauge({
name: 'speckle_server_knex_free',
help: 'Number of free DB connections',
@@ -39,7 +36,6 @@ function initKnexPrometheusMetrics(params: { db: Knex }) {
}
})
//metricUsed =
new prometheusClient.Gauge({
name: 'speckle_server_knex_used',
help: 'Number of used DB connections',
@@ -48,7 +44,6 @@ function initKnexPrometheusMetrics(params: { db: Knex }) {
}
})
//metricPendingAquires =
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending',
help: 'Number of pending DB connection aquires',
@@ -57,7 +52,6 @@ function initKnexPrometheusMetrics(params: { db: Knex }) {
}
})
//metricPendingCreates =
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_creates',
help: 'Number of pending DB connection creates',
@@ -66,7 +60,6 @@ function initKnexPrometheusMetrics(params: { db: Knex }) {
}
})
//metricPendingValidations =
new prometheusClient.Gauge({
name: 'speckle_server_knex_pending_validations',
help: 'Number of pending DB connection validations. This is a state between pending acquisition and acquiring a connection.',
@@ -6,6 +6,11 @@ export type MaybeNullOrUndefined<T> = T | null | undefined
export type MaybeAsync<T> = T | Promise<T>
export type MaybeFalsy<T> = T | null | undefined | false | '' | 0
/**
* Allows some optional properties to be required
*/
export type ForceRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] }
/**
* In TS undefined !== void, so use this type guard to check for both
*/
+20 -8
View File
@@ -16812,24 +16812,36 @@ __metadata:
resolution: "@speckle/fileimport-service@workspace:packages/fileimport-service"
dependencies:
"@speckle/shared": "workspace:^"
bcrypt: "npm:^5.0.1"
cross-env: "npm:^7.0.3"
crypto-random-string: "npm:^3.3.1"
"@types/bcrypt": "npm:^5.0.0"
"@types/lodash-es": "npm:^4.17.6"
"@types/node": "npm:^18.19.38"
"@vitest/coverage-istanbul": "npm:^1.6.0"
bcrypt: "npm:^5.0.0"
concurrently: "npm:^8.2.2"
crypto: "npm:^1.0.1"
crypto-random-string: "npm:^3.2.0"
dotenv: "npm:^16.4.5"
eslint: "npm:^9.4.0"
eslint-config-prettier: "npm:^9.1.0"
eslint-plugin-vitest: "npm:^0.5.4"
esm-module-alias: "npm:^2.2.0"
knex: "npm:^2.5.1"
lodash: "npm:^4.17.21"
lodash-es: "npm:^4.17.21"
nodemon: "npm:^2.0.20"
pg: "npm:^8.7.3"
pino: "npm:^8.7.0"
pino-http: "npm:^8.0.0"
pino-pretty: "npm:^9.1.1"
prettier: "npm:^2.5.1"
prom-client: "npm:^14.0.1"
rimraf: "npm:^5.0.7"
tarn: "npm:^3.0.2"
typescript: "npm:^4.6.4"
typescript-eslint: "npm:^7.12.0"
undici: "npm:^5.28.4"
valid-filename: "npm:^3.1.0"
vitest: "npm:^1.6.0"
web-ifc: "npm:^0.0.36"
znv: "npm:^0.4.0"
zod: "npm:^3.22.4"
languageName: unknown
linkType: soft
@@ -25368,7 +25380,7 @@ __metadata:
languageName: node
linkType: hard
"bcrypt@npm:^5.0.0, bcrypt@npm:^5.0.1":
"bcrypt@npm:^5.0.0":
version: 5.1.1
resolution: "bcrypt@npm:5.1.1"
dependencies:
@@ -28097,7 +28109,7 @@ __metadata:
languageName: node
linkType: hard
"crypto-random-string@npm:^3.2.0, crypto-random-string@npm:^3.3.1":
"crypto-random-string@npm:^3.2.0":
version: 3.3.1
resolution: "crypto-random-string@npm:3.3.1"
dependencies: