Files
speckle-sharp-connectors/Importers/Rhino/Speckle.Importers.JobProcessor/JobProcessor.cs
T
Jedd Morgan 7860c44f4e feat(api)!: Implement new packfile based sends via SendPipline (aka DuckDB changes) (#1277)
* Dim/quack lets go (#1275)

* Add model ingestion to sharp connectors

* correct ingestion message

* Progress

* grasshopper

* GH exception messages

* fix GH

* file names

* revit file name

* grasshopper file names

* etabs file names

* delete tests

* tekla maybe

* ingestion  scope

* bad boolean logic

* Longer TimeSpan

* wip upload pipe

* 10s

* passthrough ingestion id

* happy hack time: prevent ingestion completion

this is handled server-side in the processing logic.

* add packfile send endpoint detection and routing

Route to SendViaPackfile when the server supports the upload-signing
endpoint (POST probe, 404 = unsupported) and a continuous traversal
builder is registered.

* Adds Continuous Traversal Builder

Introduces a Continuous Traversal Builder to manage the conversion and processing of Revit elements within a Send Pipeline.

---------

Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com>

* feat(api): DI Refactor for Duck DB + Gergo's API endpoint changes (#1282)

* Di

* undo accidental change

* Feat (duck): dui ingestion updates post upload (#1295)

* Pass optional ingestion id to DUI

* Make ingestion id null for the SendViaIngestion, see the note :)

* feat!: Duckdev progress reporitng (#1296)

* Di

* throwaway from laptop

* Progress reporting

* Use matching logger

* Revit and revert rhino unpacker progress

* more revertion

* make pr even cleaner

* and this one

* fix build issues with other connectors

* SDK nuget (#1299)

* Bump to 3.14.0-alpha.2

* Feat(duck): grasshopper (#1297)

* Duck x Grasshopper - who would win?

* Fix registration for new builder

* missing imports

* return version id grasshopper

* Align sync resource to sync

---------

Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com>

* Bump SDK

* feat(importer): rhino file importer changes for packfile (#1301)

* rhino importer changes

* correct deps

* Bump SDK

* Fix build issues

* ditto

* Fix build issue

* Lower standards

* Fix build

* feat: duck for acad, civil, navis, tekla, etabs (#1300)

* duck: acad, civil, etabs, tekla, navis and bump channels to 10.0.0

* notes

* fix conflicts

* more conflicts

* Ready for testing

* fix(duck): Fix send caching (#1302)

* potential fix

* undo-rhino chnages

* fix xml comment

* amended comment

* revit

* Fix build

* Aligned converting message

* fix: reoccurring object references

* Bump sdk and resolve merge conflict issues

* Merge pull request #1317 from specklesystems/jrm/importer-tracing

feat(otel): Tracing and OTEL changes for Rhino importer

* Fix revit linked model progress (#1312)

* Revert otel packages

* bump SDK

* Trace unpacking groups

* Align trace context nullability with app

* Disable send caching in Navisworks

* comments

* Update FileimportPayload.cs

* fix using directive

---------

Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com>

* Fix merge conflicts

---------

Co-authored-by: Dimitrie Stefanescu <didimitrie@gmail.com>
Co-authored-by: Oğuzhan Koral <45078678+oguzhankoral@users.noreply.github.com>
Co-authored-by: Björn Steinhagen <88777268+bjoernsteinhagen@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Sebastian Witt <sebastian.witt@rwth-aachen.de>
2026-04-08 10:07:56 +01:00

301 lines
11 KiB
C#

using System.Data;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Speckle.Connectors.Common.Extensions;
using Speckle.Connectors.Logging;
using Speckle.Importers.JobProcessor.Domain;
using Speckle.Importers.JobProcessor.JobHandlers;
using Speckle.Importers.JobProcessor.JobQueue;
using Speckle.Sdk;
using Speckle.Sdk.Api;
using Speckle.Sdk.Api.GraphQL.Enums;
using Speckle.Sdk.Api.GraphQL.Inputs;
using Speckle.Sdk.Api.GraphQL.Models;
using Speckle.Sdk.Common;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Logging;
namespace Speckle.Importers.JobProcessor;
internal sealed class JobProcessorInstance(
Repository repository,
ILogger<JobProcessorInstance> logger,
IJobHandler jobHandler,
IAccountFactory accountFactory,
IClientFactory clientFactory,
ISdkActivityFactory activityFactory
) : BackgroundService
{
private static readonly TimeSpan s_idleTimeout = TimeSpan.FromSeconds(1);
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
await RunJobProcessorLoop(cancellationToken);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Job Processor crashed");
Environment.Exit(1); //This is the only reliable way I've managed to figure out how to get windows services retry policy to actually kick in (see https://github.com/dotnet/runtime/issues/67146)
throw;
}
}
private async Task RunJobProcessorLoop(CancellationToken serviceCancellationToken)
{
await using var connection = await repository.SetupConnection(serviceCancellationToken).ConfigureAwait(false);
logger.LogInformation("Listening for jobs...");
while (true)
{
FileimportJob? job = await repository.GetNextJob(connection, serviceCancellationToken);
if (job == null)
{
logger.LogDebug("No job found, sleeping for {Timeout}", s_idleTimeout);
await Task.Delay(s_idleTimeout, serviceCancellationToken);
continue;
}
logger.LogInformation(
"Starting {JobId}, attempt {Attempt} / {MaxAttempts} - it has {ComputeBudgetSeconds}s remaining",
job.Id,
job.Attempt,
job.MaxAttempt,
job.RemainingComputeBudgetSeconds
);
using var activity = job.Payload.TraceContext?.TraceParent is not null
? activityFactory.StartRemote(job.Payload.TraceContext.TraceParent, SdkActivityKind.Consumer, "Picked up a job")
: activityFactory.Start("Picked up a job", SdkActivityKind.Consumer);
using var scopeJobId = ActivityScope.SetTag("jobId", job.Id);
using var scopeJobType = ActivityScope.SetTag("jobType", job.Payload.JobType);
using var scopeAttempt = ActivityScope.SetTag("job.attempt", job.Attempt.ToString());
using var scopeServerUrl = ActivityScope.SetTag("serverUrl", job.Payload.ServerUrl.ToString());
using var scopeProjectId = ActivityScope.SetTag("projectId", job.Payload.ProjectId);
using var scopeModelIngestionId = ActivityScope.SetTag("modelIngestion.Id", job.Payload.ModelIngestionId);
using var scopeBlobId = ActivityScope.SetTag("blobId", job.Payload.BlobId);
using var scopeFileType = ActivityScope.SetTag("fileType", job.Payload.FileType);
try
{
await AttemptJob(job, connection, serviceCancellationToken);
activity?.SetStatus(SdkActivityStatusCode.Ok);
}
catch (Exception ex)
{
// This is a very exceptional case, something is wrong with our infra
activity?.RecordException(ex);
activity?.SetStatus(SdkActivityStatusCode.Error);
throw;
}
}
}
private async Task ReportCancelled(FileimportJob job, IClient client, Exception ex, double elapsedSeconds)
{
await client.Ingestion.FailWithCancel(
new ModelIngestionCancelledInput(
job.Payload.ModelIngestionId,
job.Payload.ProjectId,
"The ingestion handler observed a cancellation request, and has cancelled the ingestion before its completion"
),
CancellationToken.None
);
logger.LogInformation(
ex,
"Attempt {Attempt} to process {JobId} cancelled after {ElapsedSeconds}",
job.Attempt,
job.Id,
elapsedSeconds
);
}
private async Task ReportFailed(
FileimportJob job,
IClient client,
Exception ex,
double elapsedSeconds,
CancellationToken cancellationToken
)
{
await client.Ingestion.FailWithError(
ModelIngestionFailedInput.FromException(job.Payload.ModelIngestionId, job.Payload.ProjectId, ex),
cancellationToken
);
logger.LogError(
ex,
"Attempt {Attempt} to process {JobId} failed after {ElapsedSeconds}",
job.Attempt,
job.Id,
elapsedSeconds
);
}
private async Task<IClient> SetupClient(FileimportJob job, CancellationToken cancellationToken)
{
var account = await accountFactory.CreateAccount(
job.Payload.ServerUrl,
job.Payload.Token,
cancellationToken: cancellationToken
);
return clientFactory.Create(account);
}
[SuppressMessage("Design", "CA1031:Do not catch general exception types")]
private async Task AttemptJob(FileimportJob job, IDbConnection connection, CancellationToken serviceCancellationToken)
{
using var activity = activityFactory.Start();
IClient? speckleClient = null;
Stopwatch stopwatch = Stopwatch.StartNew();
double totalElapsedSeconds = 0;
try
{
speckleClient = await SetupClient(job, serviceCancellationToken);
using var userScope = UserActivityScope.AddUserScope(speckleClient.Account);
if (job.Attempt > job.MaxAttempt)
{
//something went wrong, it should have been marked as failed
throw new MaxAttemptsExceededException("Unhandled error silently failed the job multiple times");
}
await ExecuteJobWithTimeout(job, speckleClient, serviceCancellationToken);
totalElapsedSeconds = stopwatch.Elapsed.TotalSeconds;
activity?.SetStatus(SdkActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.RecordException(ex);
activity?.SetStatus(SdkActivityStatusCode.Error);
totalElapsedSeconds = stopwatch.Elapsed.TotalSeconds;
try
{
speckleClient.NotNull();
switch (ex)
{
case OperationCanceledException when serviceCancellationToken.IsCancellationRequested:
// Windows service shut down, re-queue job
logger.LogWarning(
ex,
"Re-enqueueing {JobId} because it was interrupted by the windows service is stopping",
job.Id
);
await repository.ReturnJobToQueued(connection, job.Id, CancellationToken.None); //this behaviour needs to be kept aligned with the server's GC behaviour
await speckleClient.Ingestion.Requeue(
new(job.Payload.ModelIngestionId, job.Payload.ProjectId, "Re-enqueuing job"),
CancellationToken.None
);
break;
case IngestionCancelledException { Ingestion.statusData.status: ModelIngestionStatus.failed }:
// Server GC will fail inactive jobs AND request cancel (despite it not being an explicit user cancel request)
// since the job is already in failed status, we don't need to try and move it to Canceled status
break;
case IngestionCancelledException:
await ReportCancelled(job, speckleClient, ex, totalElapsedSeconds);
break;
default:
await ReportFailed(job, speckleClient, ex, totalElapsedSeconds, serviceCancellationToken);
break;
}
}
catch (Exception ex2)
{
logger.LogError(new AggregateException(ex, ex2), "Failed to report failure status");
// somehow we're in a weird state,
// let's return the job to the queued state where it will get picked up again until one of total timeout,
// max attempts, or exhausted compute budget is reached.
// The server is responsible for garbage collecting jobs which have reached these error conditions and moving
// them to a failed status.
await repository.ReturnJobToQueued(connection, job.Id, CancellationToken.None);
if (ex2.IsFatal())
{
throw;
}
}
}
finally
{
speckleClient?.Dispose();
if (totalElapsedSeconds <= 0)
{
totalElapsedSeconds = stopwatch.Elapsed.TotalSeconds;
}
await repository.DeductFromComputeBudget(connection, job.Id, (long)totalElapsedSeconds, CancellationToken.None);
}
}
/// <summary>
///
/// </summary>
/// <param name="job"></param>
/// <param name="cancellationToken"></param>
/// <returns>rootObjectId if attempt was successful</returns>
/// <exception cref="OperationCanceledException">Timeout was reached AND MaxAttempt was reached</exception>
private async Task<string> ExecuteJobWithTimeout(
FileimportJob job,
IClient client,
CancellationToken cancellationToken
)
{
ModelIngestion ingestion = await client.Ingestion.Get(
job.Payload.ModelIngestionId,
job.Payload.ProjectId,
cancellationToken
);
//respect the remaining compute budget
int jobTimeout = Math.Max(0, Math.Min(job.Payload.TimeOutSeconds, job.RemainingComputeBudgetSeconds));
using CancellationTokenSource timeout = new(TimeSpan.FromSeconds(jobTimeout));
using CancellationTokenSource ingestionCancelled = new();
using var subscription = client.Subscription.CreateProjectModelIngestionCancellationRequestedSubscription(
job.Payload.ModelIngestionId,
job.Payload.ProjectId
);
subscription.Listeners += (_, e) =>
{
logger.LogInformation(
"Cancellation of {ModelIngestionId} has been requested via {Type} update ({IsCancellationRequested})",
e.modelIngestion.id,
e.type,
e.modelIngestion.cancellationRequested
);
ingestion = e.modelIngestion;
ingestionCancelled.Cancel();
};
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(
timeout.Token,
ingestionCancelled.Token,
cancellationToken
);
try
{
return await jobHandler.ProcessJob(job, client, ingestion, linkedSource.Token);
}
catch (OperationCanceledException ex) when (ingestionCancelled.IsCancellationRequested)
{
throw new IngestionCancelledException("Ingestion cancellation was requested", ex) { Ingestion = ingestion };
}
catch (OperationCanceledException ex) when (timeout.IsCancellationRequested)
{
throw new JobTimeoutException($"Job was cancelled due to reaching the {jobTimeout} second timeout", ex);
}
}
}