c894f40f83
* feat(server): add server authz pipeline rework first sketch * feat(server authz): add new server authz middleware poc implementation * test(server authz): add unittests for the new server authz workflow * feat(wip rework of fileuploads vs blob storage): add basim impl of separate blob storage service * feat(fileimport service): refactored file import service to utilize the new asssetstorage service * refactor(server errors): refactor server errors to use the shared module definitions Now all the errors inherit from BaseError * refactor(fileimport service): cleanup after refactor * feat(frontend fileimports): use the new blob storage for downloading the original file * refactor(server fileimports): clean up the remnants of S3 storage from file imports * refactor(server authz): centralize generic authz pipeline configs * refactor(server blob storage): refactor / rename everything to use the `blob-storage` name * ci(circleci): add s3 objectstorage environment variables * ci(circleci): fix missing env variables * ci(circleci): add minio test container * ci(circleci): fix minio app startup * ci(circleci): enable circleci remote docker * ci(circleci): fix minio startup * ci(cirleci): detach and wait properly for minio to start * ci(circleci): revert to additional minio img config, it only fails when the container is stopped ?! * ci(circleci): disable file uploads * fix(fileimports): update with blob storage refactor leftovers * feat(server blob storage): add blob storage graphql api * refactor(server errors): merge new errors to shared module * fix(server comments rte): fix import for RTE error * chore(fileimports): remove node-fetch from dependency * chore(server): remove body parser dependency * fix(server blob storage): fix gql api * fix(frontend): fix fileupload item not loading the new upload status, cause of premature event fire * feat(server blob storage): fix file size limit and allow for public streams * Update packages/server/modules/blobstorage/graph/schemas/blobstorage.graphql Co-authored-by: Kristaps Fabians Geikins <fabis94@live.com> * chore(blobstorage): fix PR review issues * fix(server): fix import bugs * test(server): blob storage first test * test(server blob services): add tests for blob storage services * test(server blob storage): add service and rest api tests * test(server blob storage): add server blob storage graphql api tests * feat(server blob storage): store and make available blob fileHash attribute * feat(server authz): add fatal failure option to server authz pipeline * test(server authz): add optional stream context checks with tests * feat(monitor deployment): add shutdown signal handling to monitor deployment container Co-authored-by: Kristaps Fabians Geikins <fabis94@live.com>
136 lines
4.3 KiB
Python
136 lines
4.3 KiB
Python
#!/usr/bin/env python
|
|
import os
|
|
|
|
import psycopg2
|
|
from prometheus_client import start_http_server, Gauge
|
|
import time
|
|
import logging
|
|
LOG = logging.getLogger(__name__)
|
|
PG_CONNECTION_STRING = os.environ['PG_CONNECTION_STRING']
|
|
|
|
PROM = {
|
|
'db_size': Gauge('speckle_db_size', 'Size of the entire database (in bytes)'),
|
|
'objects': Gauge('speckle_db_objects', 'Number of objects'),
|
|
'streams': Gauge('speckle_db_streams', 'Number of streams'),
|
|
'commits': Gauge('speckle_db_commits', 'Number of commits'),
|
|
'users': Gauge('speckle_db_users', 'Number of users'),
|
|
'fileimports': Gauge('speckle_db_fileimports', 'Number of imported files, by type and status', labelnames=('filetype','status')),
|
|
'webhooks': Gauge('speckle_db_webhooks', 'Number of webhook calls, by status', labelnames=('status',)),
|
|
'previews': Gauge('speckle_db_previews', 'Number of previews, by status', labelnames=('status',)),
|
|
'filesize': Gauge('speckle_db_filesize', 'Size of imported files, by type (in bytes)', labelnames=('filetype',)),
|
|
}
|
|
|
|
|
|
def tick(cur):
|
|
# Total DB size
|
|
cur.execute('SELECT pg_database_size(%s)', (cur.connection.info.dbname,))
|
|
PROM['db_size'].set(cur.fetchone()[0])
|
|
|
|
# Counts for users, streams, commits, objects
|
|
cur.execute("SELECT count(*) FROM objects;")
|
|
PROM['objects'].set(cur.fetchone()[0])
|
|
cur.execute("SELECT count(*) FROM streams;")
|
|
PROM['streams'].set(cur.fetchone()[0])
|
|
cur.execute("SELECT count(*) FROM commits;")
|
|
PROM['commits'].set(cur.fetchone()[0])
|
|
cur.execute("SELECT count(*) FROM users;")
|
|
PROM['users'].set(cur.fetchone()[0])
|
|
|
|
# File Imports
|
|
cur.execute(
|
|
'''
|
|
SELECT "fileType", "convertedStatus", count(*)
|
|
FROM file_uploads
|
|
GROUP BY ("fileType", "convertedStatus")
|
|
'''
|
|
)
|
|
# put in a dictionary so we fill non-existing statuses with zeroes
|
|
# (query can return PENDING files, then the next query will not return any PENDING rows. -> need to reset the metric to 0)
|
|
used_labels = {}
|
|
for row in cur:
|
|
if row[0] not in used_labels:
|
|
used_labels[row[0]] = {}
|
|
used_labels[row[0]][str(row[1])] = row[2]
|
|
for file_type in used_labels:
|
|
for status in range(4):
|
|
if str(status) in used_labels[file_type]:
|
|
PROM['fileimports'].labels(file_type, str(status)).set(used_labels[file_type][str(status)])
|
|
else:
|
|
PROM['fileimports'].labels(file_type, str(status)).set(0)
|
|
|
|
cur.execute(
|
|
'''
|
|
SELECT "fileType", SUM("fileSize")
|
|
FROM file_uploads
|
|
GROUP BY "fileType"
|
|
'''
|
|
)
|
|
for row in cur:
|
|
PROM['filesize'].labels(row[0]).set(row[1])
|
|
|
|
# Webhooks
|
|
cur.execute(
|
|
'''
|
|
SELECT status, count(*)
|
|
FROM webhooks_events
|
|
GROUP BY status
|
|
'''
|
|
)
|
|
values = {}
|
|
for row in cur:
|
|
values[str(row[0])] = row[1]
|
|
for status in range(4):
|
|
if str(status) in values:
|
|
PROM['webhooks'].labels(str(status)).set(values[str(status)])
|
|
else:
|
|
PROM['webhooks'].labels(str(status)).set(0)
|
|
|
|
# Previews
|
|
cur.execute(
|
|
'''
|
|
SELECT "previewStatus", count(*)
|
|
FROM object_preview
|
|
GROUP BY "previewStatus"
|
|
'''
|
|
)
|
|
values = {}
|
|
for row in cur:
|
|
values[str(row[0])] = row[1]
|
|
for status in range(4):
|
|
if str(status) in values:
|
|
PROM['previews'].labels(str(status)).set(values[str(status)])
|
|
else:
|
|
PROM['previews'].labels(str(status)).set(0)
|
|
|
|
def main():
|
|
start_http_server(9092)
|
|
|
|
while True:
|
|
conn = None
|
|
cur = None
|
|
try:
|
|
t0 = time.time()
|
|
conn = psycopg2.connect(
|
|
PG_CONNECTION_STRING,
|
|
application_name='speckle_monitor_deployment',
|
|
)
|
|
cur = conn.cursor()
|
|
t1 = time.time()
|
|
tick(cur)
|
|
t2 = time.time()
|
|
LOG.info("[%s] Updated metrics. (connected in %s, queried in %s)", t2, t1 - t0, t2 - t1)
|
|
except Exception as ex:
|
|
LOG.error("Error: %s", str(ex))
|
|
finally:
|
|
if cur:
|
|
cur.close()
|
|
if conn:
|
|
conn.close()
|
|
|
|
time.sleep(120)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.INFO)
|
|
main()
|