From c16147463f35c8c256514dc26451cd58fd82cd2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= <57442769+gjedlicska@users.noreply.github.com> Date: Tue, 7 Oct 2025 14:07:34 +0200 Subject: [PATCH] feat(ifc importer): respect timeouts (#5561) * wip ifc subprocess rework * feat(ifc-import-service): move the import to a subprocess * feat(ifc): fix imports * fix(ifc-import-service): add job_processor to the docker container * fix(ifc-import-service): wrap the timeout exception and kill the process --- packages/ifc-import-service/Dockerfile | 3 +- packages/ifc-import-service/job_processor.py | 8 + packages/ifc-import-service/main.py | 4 +- .../src/ifc_importer/__init__.py | 3 - .../src/ifc_importer/client.py | 26 +++ .../src/ifc_importer/domain.py | 15 ++ .../src/ifc_importer/job_manager.py | 212 ++++++++++++++++++ .../src/ifc_importer/job_processor.py | 204 ----------------- .../src/ifc_importer/process_job.py | 65 ++++++ .../src/ifc_importer/repository.py | 13 +- 10 files changed, 339 insertions(+), 214 deletions(-) create mode 100644 packages/ifc-import-service/job_processor.py create mode 100644 packages/ifc-import-service/src/ifc_importer/client.py create mode 100644 packages/ifc-import-service/src/ifc_importer/job_manager.py delete mode 100644 packages/ifc-import-service/src/ifc_importer/job_processor.py create mode 100644 packages/ifc-import-service/src/ifc_importer/process_job.py diff --git a/packages/ifc-import-service/Dockerfile b/packages/ifc-import-service/Dockerfile index 939715a40..b66633815 100644 --- a/packages/ifc-import-service/Dockerfile +++ b/packages/ifc-import-service/Dockerfile @@ -36,13 +36,14 @@ ENV PATH=/app/bin:$PATH # Don't run your app as root. RUN groupadd -r app && \ -useradd -r -d /app -g app -N app + useradd -r -d /app -g app -N app STOPSIGNAL SIGINT COPY --from=build --chown=app:app /app /app COPY --chown=app:app ./packages/ifc-import-service/main.py /app +COPY --chown=app:app ./packages/ifc-import-service/job_processor.py /app # COPY --chown=appuser:appuser . . # # Switch to non-root user diff --git a/packages/ifc-import-service/job_processor.py b/packages/ifc-import-service/job_processor.py new file mode 100644 index 000000000..6b0d0f04c --- /dev/null +++ b/packages/ifc-import-service/job_processor.py @@ -0,0 +1,8 @@ +import base64 +from sys import argv + +from ifc_importer.process_job import process_job + +if __name__ == "__main__": + _, workdir_path, job_payload = argv + _ = process_job(workdir_path, base64.b64decode(job_payload).decode()) diff --git a/packages/ifc-import-service/main.py b/packages/ifc-import-service/main.py index 40412a000..97de53cf7 100644 --- a/packages/ifc-import-service/main.py +++ b/packages/ifc-import-service/main.py @@ -8,7 +8,7 @@ import structlog from prometheus_client import start_http_server from structlog_to_seq import CelfProcessor -from ifc_importer.job_processor import job_processor +from ifc_importer.job_manager import job_manager def configure_logger() -> structlog.stdlib.BoundLogger: @@ -49,7 +49,7 @@ class HealthcheckHTTPRequestHandler(BaseHTTPRequestHandler): async def main(): logger = configure_logger() - task = asyncio.create_task(job_processor(logger)) + task = asyncio.create_task(job_manager(logger)) httpd = HTTPServer(("0.0.0.0", 9080), HealthcheckHTTPRequestHandler) healthcheck_server_process = Process(target=httpd.serve_forever, daemon=True) healthcheck_server_process.start() diff --git a/packages/ifc-import-service/src/ifc_importer/__init__.py b/packages/ifc-import-service/src/ifc_importer/__init__.py index 377aaed83..e69de29bb 100644 --- a/packages/ifc-import-service/src/ifc_importer/__init__.py +++ b/packages/ifc-import-service/src/ifc_importer/__init__.py @@ -1,3 +0,0 @@ -from ifc_importer.job_processor import job_processor - -__all__ = ["job_processor"] diff --git a/packages/ifc-import-service/src/ifc_importer/client.py b/packages/ifc-import-service/src/ifc_importer/client.py new file mode 100644 index 000000000..8cf19456a --- /dev/null +++ b/packages/ifc-import-service/src/ifc_importer/client.py @@ -0,0 +1,26 @@ +from specklepy.core.api.client import ( # pyright: ignore[reportMissingTypeStubs] + SpeckleClient, +) + +from ifc_importer.domain import FileimportPayload + + +def setup_client(job_payload: FileimportPayload) -> SpeckleClient: + speckle_client = SpeckleClient( + job_payload.server_url, + job_payload.server_url.startswith("https"), + ) + speckle_client.authenticate_with_token(job_payload.token) + if not speckle_client.account: + msg = ( + f"Could not authenticate to {job_payload.server_url}", + "with the provided token", + ) + raise ValueError(msg) + + if not speckle_client.account.userInfo.email: + raise ValueError( + "activeUser.email did not get fetched. Does the token lack profile:email?" + ) + + return speckle_client diff --git a/packages/ifc-import-service/src/ifc_importer/domain.py b/packages/ifc-import-service/src/ifc_importer/domain.py index 500062b6b..27aeb247b 100644 --- a/packages/ifc-import-service/src/ifc_importer/domain.py +++ b/packages/ifc-import-service/src/ifc_importer/domain.py @@ -27,6 +27,21 @@ class FileimportPayload(JobBase): time_out_seconds: int +class FileimportSuccess(JobBase): + download_duration_seconds: float + parse_duration_seconds: float + version_id: str + + +class FileimportError(JobBase): + reason: str + stack_trace: str + + +class FileimportResult(JobBase): + outcome: FileimportSuccess | FileimportError + + class JobStatus(StrEnum): """Status enumeration for the job.""" diff --git a/packages/ifc-import-service/src/ifc_importer/job_manager.py b/packages/ifc-import-service/src/ifc_importer/job_manager.py new file mode 100644 index 000000000..3ede5f51a --- /dev/null +++ b/packages/ifc-import-service/src/ifc_importer/job_manager.py @@ -0,0 +1,212 @@ +import asyncio +import base64 +import tempfile +import time +from math import floor +from pathlib import Path + +import structlog +from specklepy.core.api.inputs.file_import_inputs import ( + FileImportErrorInput, + FileImportResult, + FileImportSuccessInput, +) +from specklepy.logging import metrics + +from ifc_importer.client import setup_client +from ifc_importer.domain import FileimportError, FileimportResult, JobStatus +from ifc_importer.repository import ( + deduct_from_compute_budget, + get_next_job, + return_job_to_queued, + setup_connection, +) + +IDLE_TIMEOUT = 1 + + +async def job_manager(logger: structlog.stdlib.BoundLogger): + parser = "speckle_ifc" + logger = logger.bind(parser=parser) + connection = await setup_connection() + logger.info("job processor started") + while True: + job = await get_next_job(connection) + if not job: + await asyncio.sleep(IDLE_TIMEOUT) + continue + + start = time.time() + duration = 0 + job_timeout = max( + 1, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds) + ) + + # Forcefully reset metrics, + # we don't want it to reuse any server/user ids between jobs + metrics.METRICS_TRACKER = None + metrics.HOST_APP = "ifc" + + speckle_client = setup_client(job.payload) + + job_id = job.id + job_status = JobStatus.QUEUED + ex: Exception | None = None + attempt = job.attempt + version_id: str | None = None + + # this will create a new temp directory and also delete it, + # when the with block closes + with tempfile.TemporaryDirectory() as temp_dir: + try: + # i do not get this why are we handling this here? + if attempt > job.max_attempt: + # something went wrong, it should have been marked as failed + raise Exception( + "Unhandled error silently failed the job multiple times" + ) + + logger = logger.bind(job_id=job_id, project_id=job.payload.project_id) + logger.info( + "starting job {job_id} for project {project_id}," + + " attempt {attempt} /" + + " {max_attempts} with remaining compute budget" + + " {remaining_compute_budget_seconds}s and timeout {job_timeout}s", + attempt=attempt, + max_attempts=job.max_attempt, + remaining_compute_budget_seconds=job.remaining_compute_budget_seconds, + job_timeout=job_timeout, + ) + cmd = ( + f"python job_processor.py {temp_dir}" + + f" { + base64.b64encode( + job.payload.model_dump_json().encode() + ).decode() + }" + ) + # subprocess + process = await asyncio.create_subprocess_shell( + cmd, + ) + try: + exit_code = await asyncio.wait_for( + process.wait(), timeout=job_timeout + ) + except TimeoutError as te: + process.kill() + raise Exception( + "Job was cancelled due to reaching the" + + f" {job_timeout} second timeout" + ) from te + # this should never happen, as the job processor is handling errors + # when the process is killed with a timeout we raise a TimeoutError + if exit_code != 0: + raise Exception(f"Job failed with exit code {exit_code}") + + result_path = Path(temp_dir, "result.json") + if not result_path.exists(): + # is this a special case? + raise Exception("Job exited without a result") + # temp_dir.join("result.json") + + outcome = FileimportResult.model_validate_json( + result_path.read_text() + ).outcome + + if isinstance(outcome, FileimportError): + logger.error( + "File import subprocess failed", exc_info=outcome.stack_trace + ) + raise Exception(outcome.reason) + + # except TimeoutError as te: + # print(te) + + # handler = job_handler(speckle_client, job.payload, logger) + # this will raise a TimeoutError if handler does not complete in time + # version, download_duration, parse_duration = await asyncio.wait_for( + # handler, timeout=job_timeout + # ) + version_id = outcome.version_id + + duration = time.time() - start + logger.info( + "Finished parsing job after {duration}s," + + " creating version {version_id}", + duration=duration, + version_id=version_id, + ) + + _ = speckle_client.file_import.finish_file_import_job( + FileImportSuccessInput( + project_id=job.payload.project_id, + # the blob id identifies the "job" here + job_id=job.payload.blob_id, + result=FileImportResult( + parser=parser, + version_id=version_id, + download_duration_seconds=outcome.download_duration_seconds, + duration_seconds=duration, + parse_duration_seconds=outcome.parse_duration_seconds, + ), + ) + ) + # the server is responsible for moving successful + # jobs to the succeeded state + # mark it as succeeded so we do not enter any error + # handling routines on finalisation + job_status = JobStatus.SUCCEEDED + + # raised if the task is canceled + except Exception as e: + # + ex = e + job_status = JobStatus.FAILED + finally: + if duration <= 0: + # it probably failed before we calculated the duration, + # so calculate it now + duration = time.time() - start + await deduct_from_compute_budget( + connection, logger, job_id, floor(duration) + ) + + if job_status == JobStatus.FAILED: + # we should be reporting the failure to the server + logger.error("job processing failed", exc_info=ex) + try: + _ = speckle_client.file_import.finish_file_import_job( + FileImportErrorInput( + project_id=job.payload.project_id, + # the blob id identifies the job to the server + job_id=job.payload.blob_id, + reason=str(ex), + result=FileImportResult( + parser=parser, + version_id=None, + download_duration_seconds=0, + duration_seconds=time.time() - start, + parse_duration_seconds=0, + ), + ) + ) + # the server is responsible for moving failed jobs to the + # failed state + # so the worker does not have to do anything further + except Exception as ex: + logger.error("failed to report job failure", exc_info=ex) + # somehow we're in a weird state, + # let's return the job to the queued state + # where it will get picked up again until one of total timeout, + # max attempts, or exhausted compute budget is reached + # The server is responsible for garbage collecting jobs + # which have reached these error conditions and moving + # them to a failed status. + await return_job_to_queued(connection, logger, job_id) + elif job_status == JobStatus.SUCCEEDED: + # do nothing + # we expect the job to already be marked as succeeded in the + # database by the server (when the worker reported + # the results back to the server) + continue diff --git a/packages/ifc-import-service/src/ifc_importer/job_processor.py b/packages/ifc-import-service/src/ifc_importer/job_processor.py deleted file mode 100644 index a103e727c..000000000 --- a/packages/ifc-import-service/src/ifc_importer/job_processor.py +++ /dev/null @@ -1,204 +0,0 @@ -import asyncio -import tempfile -import time -from math import floor -from pathlib import Path - -import structlog -from speckleifc.main import open_and_convert_file -from specklepy.core.api.client import ( # pyright: ignore[reportMissingTypeStubs] - SpeckleClient, -) -from specklepy.core.api.inputs.file_import_inputs import ( - FileImportErrorInput, - FileImportResult, - FileImportSuccessInput, -) -from specklepy.core.api.models import Version -from specklepy.logging import metrics - -from ifc_importer.domain import FileimportPayload, JobStatus -from ifc_importer.repository import ( - deduct_from_compute_budget, - get_next_job, - return_job_to_queued, - setup_connection, -) - -IDLE_TIMEOUT = 1 - - -def setup_client(job_payload: FileimportPayload) -> SpeckleClient: - speckle_client = SpeckleClient( - job_payload.server_url, - job_payload.server_url.startswith("https"), - ) - speckle_client.authenticate_with_token(job_payload.token) - if not speckle_client.account: - msg = ( - f"Could not authenticate to {job_payload.server_url}", - "with the provided token", - ) - raise ValueError(msg) - - if not speckle_client.account.userInfo.email: - raise ValueError( - "activeUser.email did not get fetched. Does the token lack profile:email?" - ) - - return speckle_client - - -async def job_handler( - client: SpeckleClient, job: FileimportPayload, logger: structlog.stdlib.BoundLogger -) -> tuple[Version, float, float]: - # this will create a new temp file and also close it, when the with block closes - with tempfile.NamedTemporaryFile() as file: - start = time.time() - file_path = client.file_import.download_file( - job.project_id, job.blob_id, Path(file.name) - ) - download_end = time.time() - download_duration = download_end - start - logger.info( - "Finished source file download after {download_duration}", - download_duration=download_duration, - ) - project = client.project.get(job.project_id) - - version = open_and_convert_file( - file_path=str(file_path), - client=client, - project=project, - model_id=job.model_id, - version_message=f"Created from {job.file_name} upload.", - ) - parse_end = time.time() - parse_duration = parse_end - download_end - logger.info( - "Finished parsing after {parse_duration}", - parse_duration=parse_duration, - ) - return version, download_duration, parse_duration - - -async def job_processor(logger: structlog.stdlib.BoundLogger): - parser = "speckle_ifc" - logger = logger.bind(parser=parser) - connection = await setup_connection() - logger.info("job processor started") - while True: - job = await get_next_job(connection) - if not job: - logger.debug("no job found") - await asyncio.sleep(IDLE_TIMEOUT) - continue - - start = time.time() - duration = 0 - job_timeout = max( - 1, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds) - ) - - # Forcefully reset metrics, we don't want it to reuse any server/user ids between jobs - metrics.METRICS_TRACKER = None - metrics.HOST_APP = "ifc" - - speckle_client = setup_client(job.payload) - - job_id = job.id - job_status = JobStatus.QUEUED - ex: Exception | None = None - attempt = job.attempt - version_id: str | None = None - - try: - if attempt > job.max_attempt: - # something went wrong, it should have been marked as failed - raise Exception( - "Unhandled error silently failed the job multiple times" - ) - - logger = logger.bind(job_id=job_id, project_id=job.payload.project_id) - logger.info( - "starting job {job_id} for project {project_id}, attempt {attempt} / {max_attempts} with remaining compute budget {remaining_compute_budget_seconds}s and timeout {job_timeout}s", - attempt=attempt, - max_attempts=job.max_attempt, - remaining_compute_budget_seconds=job.remaining_compute_budget_seconds, - job_timeout=job_timeout, - ) - handler = job_handler(speckle_client, job.payload, logger) - # this will raise a TimeoutError if handler does not complete in time - version, download_duration, parse_duration = await asyncio.wait_for( - handler, timeout=job_timeout - ) - version_id = version.id - - duration = time.time() - start - logger.info( - "Finished parsing job after {duration}s, creating version {version_id}", - duration=duration, - version_id=version_id, - ) - - _ = speckle_client.file_import.finish_file_import_job( - FileImportSuccessInput( - project_id=job.payload.project_id, - # the blob id identifies the "job" here - job_id=job.payload.blob_id, - result=FileImportResult( - parser=parser, - version_id=version_id, - download_duration_seconds=download_duration, - duration_seconds=duration, - parse_duration_seconds=parse_duration, - ), - ) - ) - # the server is responsible for moving successful jobs to the succeeded state - # mark it as succeeded so we do not enter any error handling routines on finalisation - job_status = JobStatus.SUCCEEDED - - # raised if the task is canceled - except Exception as e: - # - ex = e - job_status = JobStatus.FAILED - finally: - if duration <= 0: - # it probably failed before we calculated the duration, so calculate it now - duration = time.time() - start - await deduct_from_compute_budget( - connection, logger, job_id, floor(duration) - ) - - if job_status == JobStatus.FAILED: - # we should be reporting the failure to the server - logger.error("job processing failed", exc_info=ex) - try: - _ = speckle_client.file_import.finish_file_import_job( - FileImportErrorInput( - project_id=job.payload.project_id, - # the blob id identifies the job to the server - job_id=job.payload.blob_id, - reason=str(ex), - result=FileImportResult( - parser=parser, - version_id=None, - download_duration_seconds=0, - duration_seconds=time.time() - start, - parse_duration_seconds=0, - ), - ) - ) - # the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further - except Exception as ex: - logger.error("failed to report job failure", exc_info=ex) - # somehow we're in a weird state, let's return the job to the queued state - # where it will get picked up again until one of total timeout, max attempts, or exhausted compute budget is reached - # The server is responsible for garbage collecting jobs which have reached these error conditions and moving them to a failed status. - await return_job_to_queued(connection, logger, job_id) - elif job_status == JobStatus.SUCCEEDED: - # do nothing - # we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server) - continue diff --git a/packages/ifc-import-service/src/ifc_importer/process_job.py b/packages/ifc-import-service/src/ifc_importer/process_job.py new file mode 100644 index 000000000..381d89034 --- /dev/null +++ b/packages/ifc-import-service/src/ifc_importer/process_job.py @@ -0,0 +1,65 @@ +import time +import traceback +from pathlib import Path +from pprint import pprint + +from speckleifc.main import open_and_convert_file +from specklepy.logging import metrics + +from ifc_importer.client import setup_client +from ifc_importer.domain import ( + FileimportError, + FileimportPayload, + FileimportResult, + FileimportSuccess, +) + + +def process_job( + work_dir_path: str, + job_payload: str, +) -> None: + work_dir = Path(work_dir_path) + outcome = None + try: + # Forcefully reset metrics, + # we don't want it to reuse any server/user ids between jobs + metrics.METRICS_TRACKER = None + metrics.HOST_APP = "ifc" + print(job_payload) + + job = FileimportPayload.model_validate_json(job_payload) + start = time.time() + + client = setup_client(job) + + file_path = client.file_import.download_file( + job.project_id, job.blob_id, work_dir.joinpath(job.file_name) + ) + download_end = time.time() + download_duration = download_end - start + project = client.project.get(job.project_id) + + version = open_and_convert_file( + file_path=str(file_path), + client=client, + project=project, + model_id=job.model_id, + version_message=f"Created from {job.file_name} upload.", + ) + parse_end = time.time() + parse_duration = parse_end - download_end + outcome = FileimportSuccess( + version_id=version.id, + download_duration_seconds=download_duration, + parse_duration_seconds=parse_duration, + ) + except Exception as ex: + stack_trace = traceback.format_exc() + outcome = FileimportError(reason=str(ex), stack_trace=stack_trace) + finally: + if outcome: + pprint(outcome) + work_dir.joinpath("result.json").write_text( + FileimportResult(outcome=outcome).model_dump_json() + ) diff --git a/packages/ifc-import-service/src/ifc_importer/repository.py b/packages/ifc-import-service/src/ifc_importer/repository.py index 128132c3b..ac184bdea 100644 --- a/packages/ifc-import-service/src/ifc_importer/repository.py +++ b/packages/ifc-import-service/src/ifc_importer/repository.py @@ -31,17 +31,21 @@ async def get_next_job(connection: Connection) -> FileimportJob | None: "updatedAt" = NOW() WHERE id = ( SELECT id FROM background_jobs - WHERE ( -- job in a QUEUED state which has not yet exceeded maximum attempts and has a positive remaining compute budget + WHERE ( -- job in a QUEUED state which has not yet + -- exceeded maximum attempts + -- and has a positive remaining compute budget payload ->> 'fileType' = 'ifc' AND status = $2 AND "attempt" < "maxAttempt" AND "remainingComputeBudgetSeconds"::int > 0 ) - OR ( -- any job left in a PROCESSING state for more than its timeout period + OR ( -- any job left in a PROCESSING state + -- for more than its timeout period payload ->> 'fileType' = 'ifc' AND status = $1 AND "attempt" <= "maxAttempt" - AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int * interval '1 second' + AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int + * interval '1 second' ) ORDER BY "createdAt" FOR UPDATE SKIP LOCKED @@ -96,7 +100,8 @@ async def deduct_from_compute_budget( used_compute_time_seconds: int, ) -> None: logger.info( - "updating job: {job_id}'s remaining compute budget by deducting {used_compute_time_seconds} seconds", + "updating job: {job_id}'s remaining compute budget by deducting" + + " {used_compute_time_seconds} seconds", job_id=job_id, used_compute_time_seconds=used_compute_time_seconds, )