feat(ifc importer): respect timeouts (#5561)

* wip ifc subprocess rework

* feat(ifc-import-service): move the import to a subprocess

* feat(ifc): fix imports

* fix(ifc-import-service): add job_processor to the docker container

* fix(ifc-import-service): wrap the timeout exception and kill the process
This commit is contained in:
Gergő Jedlicska
2025-10-07 14:07:34 +02:00
committed by GitHub
parent cb29d5c6e4
commit c16147463f
10 changed files with 339 additions and 214 deletions
+2 -1
View File
@@ -36,13 +36,14 @@ ENV PATH=/app/bin:$PATH
# Don't run your app as root.
RUN groupadd -r app && \
useradd -r -d /app -g app -N app
useradd -r -d /app -g app -N app
STOPSIGNAL SIGINT
COPY --from=build --chown=app:app /app /app
COPY --chown=app:app ./packages/ifc-import-service/main.py /app
COPY --chown=app:app ./packages/ifc-import-service/job_processor.py /app
# COPY --chown=appuser:appuser . .
#
# Switch to non-root user
@@ -0,0 +1,8 @@
import base64
from sys import argv
from ifc_importer.process_job import process_job
if __name__ == "__main__":
_, workdir_path, job_payload = argv
_ = process_job(workdir_path, base64.b64decode(job_payload).decode())
+2 -2
View File
@@ -8,7 +8,7 @@ import structlog
from prometheus_client import start_http_server
from structlog_to_seq import CelfProcessor
from ifc_importer.job_processor import job_processor
from ifc_importer.job_manager import job_manager
def configure_logger() -> structlog.stdlib.BoundLogger:
@@ -49,7 +49,7 @@ class HealthcheckHTTPRequestHandler(BaseHTTPRequestHandler):
async def main():
logger = configure_logger()
task = asyncio.create_task(job_processor(logger))
task = asyncio.create_task(job_manager(logger))
httpd = HTTPServer(("0.0.0.0", 9080), HealthcheckHTTPRequestHandler)
healthcheck_server_process = Process(target=httpd.serve_forever, daemon=True)
healthcheck_server_process.start()
@@ -1,3 +0,0 @@
from ifc_importer.job_processor import job_processor
__all__ = ["job_processor"]
@@ -0,0 +1,26 @@
from specklepy.core.api.client import ( # pyright: ignore[reportMissingTypeStubs]
SpeckleClient,
)
from ifc_importer.domain import FileimportPayload
def setup_client(job_payload: FileimportPayload) -> SpeckleClient:
speckle_client = SpeckleClient(
job_payload.server_url,
job_payload.server_url.startswith("https"),
)
speckle_client.authenticate_with_token(job_payload.token)
if not speckle_client.account:
msg = (
f"Could not authenticate to {job_payload.server_url}",
"with the provided token",
)
raise ValueError(msg)
if not speckle_client.account.userInfo.email:
raise ValueError(
"activeUser.email did not get fetched. Does the token lack profile:email?"
)
return speckle_client
@@ -27,6 +27,21 @@ class FileimportPayload(JobBase):
time_out_seconds: int
class FileimportSuccess(JobBase):
download_duration_seconds: float
parse_duration_seconds: float
version_id: str
class FileimportError(JobBase):
reason: str
stack_trace: str
class FileimportResult(JobBase):
outcome: FileimportSuccess | FileimportError
class JobStatus(StrEnum):
"""Status enumeration for the job."""
@@ -0,0 +1,212 @@
import asyncio
import base64
import tempfile
import time
from math import floor
from pathlib import Path
import structlog
from specklepy.core.api.inputs.file_import_inputs import (
FileImportErrorInput,
FileImportResult,
FileImportSuccessInput,
)
from specklepy.logging import metrics
from ifc_importer.client import setup_client
from ifc_importer.domain import FileimportError, FileimportResult, JobStatus
from ifc_importer.repository import (
deduct_from_compute_budget,
get_next_job,
return_job_to_queued,
setup_connection,
)
IDLE_TIMEOUT = 1
async def job_manager(logger: structlog.stdlib.BoundLogger):
parser = "speckle_ifc"
logger = logger.bind(parser=parser)
connection = await setup_connection()
logger.info("job processor started")
while True:
job = await get_next_job(connection)
if not job:
await asyncio.sleep(IDLE_TIMEOUT)
continue
start = time.time()
duration = 0
job_timeout = max(
1, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds)
)
# Forcefully reset metrics,
# we don't want it to reuse any server/user ids between jobs
metrics.METRICS_TRACKER = None
metrics.HOST_APP = "ifc"
speckle_client = setup_client(job.payload)
job_id = job.id
job_status = JobStatus.QUEUED
ex: Exception | None = None
attempt = job.attempt
version_id: str | None = None
# this will create a new temp directory and also delete it,
# when the with block closes
with tempfile.TemporaryDirectory() as temp_dir:
try:
# i do not get this why are we handling this here?
if attempt > job.max_attempt:
# something went wrong, it should have been marked as failed
raise Exception(
"Unhandled error silently failed the job multiple times"
)
logger = logger.bind(job_id=job_id, project_id=job.payload.project_id)
logger.info(
"starting job {job_id} for project {project_id},"
+ " attempt {attempt} /"
+ " {max_attempts} with remaining compute budget"
+ " {remaining_compute_budget_seconds}s and timeout {job_timeout}s",
attempt=attempt,
max_attempts=job.max_attempt,
remaining_compute_budget_seconds=job.remaining_compute_budget_seconds,
job_timeout=job_timeout,
)
cmd = (
f"python job_processor.py {temp_dir}"
+ f" {
base64.b64encode(
job.payload.model_dump_json().encode()
).decode()
}"
)
# subprocess
process = await asyncio.create_subprocess_shell(
cmd,
)
try:
exit_code = await asyncio.wait_for(
process.wait(), timeout=job_timeout
)
except TimeoutError as te:
process.kill()
raise Exception(
"Job was cancelled due to reaching the"
+ f" {job_timeout} second timeout"
) from te
# this should never happen, as the job processor is handling errors
# when the process is killed with a timeout we raise a TimeoutError
if exit_code != 0:
raise Exception(f"Job failed with exit code {exit_code}")
result_path = Path(temp_dir, "result.json")
if not result_path.exists():
# is this a special case?
raise Exception("Job exited without a result")
# temp_dir.join("result.json")
outcome = FileimportResult.model_validate_json(
result_path.read_text()
).outcome
if isinstance(outcome, FileimportError):
logger.error(
"File import subprocess failed", exc_info=outcome.stack_trace
)
raise Exception(outcome.reason)
# except TimeoutError as te:
# print(te)
# handler = job_handler(speckle_client, job.payload, logger)
# this will raise a TimeoutError if handler does not complete in time
# version, download_duration, parse_duration = await asyncio.wait_for(
# handler, timeout=job_timeout
# )
version_id = outcome.version_id
duration = time.time() - start
logger.info(
"Finished parsing job after {duration}s,"
+ " creating version {version_id}",
duration=duration,
version_id=version_id,
)
_ = speckle_client.file_import.finish_file_import_job(
FileImportSuccessInput(
project_id=job.payload.project_id,
# the blob id identifies the "job" here
job_id=job.payload.blob_id,
result=FileImportResult(
parser=parser,
version_id=version_id,
download_duration_seconds=outcome.download_duration_seconds,
duration_seconds=duration,
parse_duration_seconds=outcome.parse_duration_seconds,
),
)
)
# the server is responsible for moving successful
# jobs to the succeeded state
# mark it as succeeded so we do not enter any error
# handling routines on finalisation
job_status = JobStatus.SUCCEEDED
# raised if the task is canceled
except Exception as e:
#
ex = e
job_status = JobStatus.FAILED
finally:
if duration <= 0:
# it probably failed before we calculated the duration,
# so calculate it now
duration = time.time() - start
await deduct_from_compute_budget(
connection, logger, job_id, floor(duration)
)
if job_status == JobStatus.FAILED:
# we should be reporting the failure to the server
logger.error("job processing failed", exc_info=ex)
try:
_ = speckle_client.file_import.finish_file_import_job(
FileImportErrorInput(
project_id=job.payload.project_id,
# the blob id identifies the job to the server
job_id=job.payload.blob_id,
reason=str(ex),
result=FileImportResult(
parser=parser,
version_id=None,
download_duration_seconds=0,
duration_seconds=time.time() - start,
parse_duration_seconds=0,
),
)
)
# the server is responsible for moving failed jobs to the
# failed state
# so the worker does not have to do anything further
except Exception as ex:
logger.error("failed to report job failure", exc_info=ex)
# somehow we're in a weird state,
# let's return the job to the queued state
# where it will get picked up again until one of total timeout,
# max attempts, or exhausted compute budget is reached
# The server is responsible for garbage collecting jobs
# which have reached these error conditions and moving
# them to a failed status.
await return_job_to_queued(connection, logger, job_id)
elif job_status == JobStatus.SUCCEEDED:
# do nothing
# we expect the job to already be marked as succeeded in the
# database by the server (when the worker reported
# the results back to the server)
continue
@@ -1,204 +0,0 @@
import asyncio
import tempfile
import time
from math import floor
from pathlib import Path
import structlog
from speckleifc.main import open_and_convert_file
from specklepy.core.api.client import ( # pyright: ignore[reportMissingTypeStubs]
SpeckleClient,
)
from specklepy.core.api.inputs.file_import_inputs import (
FileImportErrorInput,
FileImportResult,
FileImportSuccessInput,
)
from specklepy.core.api.models import Version
from specklepy.logging import metrics
from ifc_importer.domain import FileimportPayload, JobStatus
from ifc_importer.repository import (
deduct_from_compute_budget,
get_next_job,
return_job_to_queued,
setup_connection,
)
IDLE_TIMEOUT = 1
def setup_client(job_payload: FileimportPayload) -> SpeckleClient:
speckle_client = SpeckleClient(
job_payload.server_url,
job_payload.server_url.startswith("https"),
)
speckle_client.authenticate_with_token(job_payload.token)
if not speckle_client.account:
msg = (
f"Could not authenticate to {job_payload.server_url}",
"with the provided token",
)
raise ValueError(msg)
if not speckle_client.account.userInfo.email:
raise ValueError(
"activeUser.email did not get fetched. Does the token lack profile:email?"
)
return speckle_client
async def job_handler(
client: SpeckleClient, job: FileimportPayload, logger: structlog.stdlib.BoundLogger
) -> tuple[Version, float, float]:
# this will create a new temp file and also close it, when the with block closes
with tempfile.NamedTemporaryFile() as file:
start = time.time()
file_path = client.file_import.download_file(
job.project_id, job.blob_id, Path(file.name)
)
download_end = time.time()
download_duration = download_end - start
logger.info(
"Finished source file download after {download_duration}",
download_duration=download_duration,
)
project = client.project.get(job.project_id)
version = open_and_convert_file(
file_path=str(file_path),
client=client,
project=project,
model_id=job.model_id,
version_message=f"Created from {job.file_name} upload.",
)
parse_end = time.time()
parse_duration = parse_end - download_end
logger.info(
"Finished parsing after {parse_duration}",
parse_duration=parse_duration,
)
return version, download_duration, parse_duration
async def job_processor(logger: structlog.stdlib.BoundLogger):
parser = "speckle_ifc"
logger = logger.bind(parser=parser)
connection = await setup_connection()
logger.info("job processor started")
while True:
job = await get_next_job(connection)
if not job:
logger.debug("no job found")
await asyncio.sleep(IDLE_TIMEOUT)
continue
start = time.time()
duration = 0
job_timeout = max(
1, min(job.payload.time_out_seconds, job.remaining_compute_budget_seconds)
)
# Forcefully reset metrics, we don't want it to reuse any server/user ids between jobs
metrics.METRICS_TRACKER = None
metrics.HOST_APP = "ifc"
speckle_client = setup_client(job.payload)
job_id = job.id
job_status = JobStatus.QUEUED
ex: Exception | None = None
attempt = job.attempt
version_id: str | None = None
try:
if attempt > job.max_attempt:
# something went wrong, it should have been marked as failed
raise Exception(
"Unhandled error silently failed the job multiple times"
)
logger = logger.bind(job_id=job_id, project_id=job.payload.project_id)
logger.info(
"starting job {job_id} for project {project_id}, attempt {attempt} / {max_attempts} with remaining compute budget {remaining_compute_budget_seconds}s and timeout {job_timeout}s",
attempt=attempt,
max_attempts=job.max_attempt,
remaining_compute_budget_seconds=job.remaining_compute_budget_seconds,
job_timeout=job_timeout,
)
handler = job_handler(speckle_client, job.payload, logger)
# this will raise a TimeoutError if handler does not complete in time
version, download_duration, parse_duration = await asyncio.wait_for(
handler, timeout=job_timeout
)
version_id = version.id
duration = time.time() - start
logger.info(
"Finished parsing job after {duration}s, creating version {version_id}",
duration=duration,
version_id=version_id,
)
_ = speckle_client.file_import.finish_file_import_job(
FileImportSuccessInput(
project_id=job.payload.project_id,
# the blob id identifies the "job" here
job_id=job.payload.blob_id,
result=FileImportResult(
parser=parser,
version_id=version_id,
download_duration_seconds=download_duration,
duration_seconds=duration,
parse_duration_seconds=parse_duration,
),
)
)
# the server is responsible for moving successful jobs to the succeeded state
# mark it as succeeded so we do not enter any error handling routines on finalisation
job_status = JobStatus.SUCCEEDED
# raised if the task is canceled
except Exception as e:
#
ex = e
job_status = JobStatus.FAILED
finally:
if duration <= 0:
# it probably failed before we calculated the duration, so calculate it now
duration = time.time() - start
await deduct_from_compute_budget(
connection, logger, job_id, floor(duration)
)
if job_status == JobStatus.FAILED:
# we should be reporting the failure to the server
logger.error("job processing failed", exc_info=ex)
try:
_ = speckle_client.file_import.finish_file_import_job(
FileImportErrorInput(
project_id=job.payload.project_id,
# the blob id identifies the job to the server
job_id=job.payload.blob_id,
reason=str(ex),
result=FileImportResult(
parser=parser,
version_id=None,
download_duration_seconds=0,
duration_seconds=time.time() - start,
parse_duration_seconds=0,
),
)
)
# the server is responsible for moving failed jobs to the failed state, so the worker does not have to do anything further
except Exception as ex:
logger.error("failed to report job failure", exc_info=ex)
# somehow we're in a weird state, let's return the job to the queued state
# where it will get picked up again until one of total timeout, max attempts, or exhausted compute budget is reached
# The server is responsible for garbage collecting jobs which have reached these error conditions and moving them to a failed status.
await return_job_to_queued(connection, logger, job_id)
elif job_status == JobStatus.SUCCEEDED:
# do nothing
# we expect the job to already be marked as succeeded in the database by the server (when the worker reported the results back to the server)
continue
@@ -0,0 +1,65 @@
import time
import traceback
from pathlib import Path
from pprint import pprint
from speckleifc.main import open_and_convert_file
from specklepy.logging import metrics
from ifc_importer.client import setup_client
from ifc_importer.domain import (
FileimportError,
FileimportPayload,
FileimportResult,
FileimportSuccess,
)
def process_job(
work_dir_path: str,
job_payload: str,
) -> None:
work_dir = Path(work_dir_path)
outcome = None
try:
# Forcefully reset metrics,
# we don't want it to reuse any server/user ids between jobs
metrics.METRICS_TRACKER = None
metrics.HOST_APP = "ifc"
print(job_payload)
job = FileimportPayload.model_validate_json(job_payload)
start = time.time()
client = setup_client(job)
file_path = client.file_import.download_file(
job.project_id, job.blob_id, work_dir.joinpath(job.file_name)
)
download_end = time.time()
download_duration = download_end - start
project = client.project.get(job.project_id)
version = open_and_convert_file(
file_path=str(file_path),
client=client,
project=project,
model_id=job.model_id,
version_message=f"Created from {job.file_name} upload.",
)
parse_end = time.time()
parse_duration = parse_end - download_end
outcome = FileimportSuccess(
version_id=version.id,
download_duration_seconds=download_duration,
parse_duration_seconds=parse_duration,
)
except Exception as ex:
stack_trace = traceback.format_exc()
outcome = FileimportError(reason=str(ex), stack_trace=stack_trace)
finally:
if outcome:
pprint(outcome)
work_dir.joinpath("result.json").write_text(
FileimportResult(outcome=outcome).model_dump_json()
)
@@ -31,17 +31,21 @@ async def get_next_job(connection: Connection) -> FileimportJob | None:
"updatedAt" = NOW()
WHERE id = (
SELECT id FROM background_jobs
WHERE ( -- job in a QUEUED state which has not yet exceeded maximum attempts and has a positive remaining compute budget
WHERE ( -- job in a QUEUED state which has not yet
-- exceeded maximum attempts
-- and has a positive remaining compute budget
payload ->> 'fileType' = 'ifc'
AND status = $2
AND "attempt" < "maxAttempt"
AND "remainingComputeBudgetSeconds"::int > 0
)
OR ( -- any job left in a PROCESSING state for more than its timeout period
OR ( -- any job left in a PROCESSING state
-- for more than its timeout period
payload ->> 'fileType' = 'ifc'
AND status = $1
AND "attempt" <= "maxAttempt"
AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int * interval '1 second'
AND "updatedAt" < NOW() - (payload ->> 'timeOutSeconds')::int
* interval '1 second'
)
ORDER BY "createdAt"
FOR UPDATE SKIP LOCKED
@@ -96,7 +100,8 @@ async def deduct_from_compute_budget(
used_compute_time_seconds: int,
) -> None:
logger.info(
"updating job: {job_id}'s remaining compute budget by deducting {used_compute_time_seconds} seconds",
"updating job: {job_id}'s remaining compute budget by deducting"
+ " {used_compute_time_seconds} seconds",
job_id=job_id,
used_compute_time_seconds=used_compute_time_seconds,
)