From 070f21b07500086d64c7db80176cfc7de04d9c79 Mon Sep 17 00:00:00 2001 From: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com> Date: Thu, 26 Feb 2026 10:45:48 +0000 Subject: [PATCH] feat(progress): Add progress reporting to uploader (#446) * refactor uploader for progress * progress * not so many decimal places * small tweak to RenderStreamProgress * fix unit tests * uploading data --- .../Helpers/StopwatchPollyfills.cs | 21 +++++ .../Pipelines/Progress/AggregateProgress.cs | 12 +++ .../Progress/IngestionProgressManager.cs | 89 +++++++++++++++++++ .../IngestionProgressManagerFactory.cs | 22 +++++ .../Pipelines/Progress/ProgressArgs.cs | 6 ++ .../{ => Progress}/ProgressStream.cs | 16 ++-- .../Progress/RenderedStreamProgress.cs | 40 +++++++++ src/Speckle.Sdk/Pipelines/Send/DiskStore.cs | 77 ++++++++++++++++ .../Pipelines/Send/SendPipeline.cs | 30 +++++-- src/Speckle.Sdk/Pipelines/Send/Uploader.cs | 78 ++++------------ .../Pipelines/Send/UploaderDTOs.cs | 5 -- src/Speckle.Sdk/ServiceRegistration.cs | 4 +- 12 files changed, 313 insertions(+), 87 deletions(-) create mode 100644 src/Speckle.Sdk/Helpers/StopwatchPollyfills.cs create mode 100644 src/Speckle.Sdk/Pipelines/Progress/AggregateProgress.cs create mode 100644 src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManager.cs create mode 100644 src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManagerFactory.cs create mode 100644 src/Speckle.Sdk/Pipelines/Progress/ProgressArgs.cs rename src/Speckle.Sdk/Pipelines/{ => Progress}/ProgressStream.cs (87%) create mode 100644 src/Speckle.Sdk/Pipelines/Progress/RenderedStreamProgress.cs create mode 100644 src/Speckle.Sdk/Pipelines/Send/DiskStore.cs diff --git a/src/Speckle.Sdk/Helpers/StopwatchPollyfills.cs b/src/Speckle.Sdk/Helpers/StopwatchPollyfills.cs new file mode 100644 index 00000000..83dd0e05 --- /dev/null +++ b/src/Speckle.Sdk/Helpers/StopwatchPollyfills.cs @@ -0,0 +1,21 @@ +using System.Diagnostics; + +namespace Speckle.Sdk.Helpers; + +public static class StopwatchPolyfills +{ +#if !NET7_0_OR_GREATER + private static readonly double s_tickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency; +#endif + + public static TimeSpan GetElapsedTime(long startingTimestamp) + { +#if NET7_0_OR_GREATER + return Stopwatch.GetElapsedTime(startingTimestamp); +#else + + long elapsedTicks = Stopwatch.GetTimestamp() - startingTimestamp; + return new TimeSpan((long)(elapsedTicks * s_tickFrequency)); +#endif + } +} diff --git a/src/Speckle.Sdk/Pipelines/Progress/AggregateProgress.cs b/src/Speckle.Sdk/Pipelines/Progress/AggregateProgress.cs new file mode 100644 index 00000000..74dfa327 --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Progress/AggregateProgress.cs @@ -0,0 +1,12 @@ +namespace Speckle.Sdk.Pipelines.Progress; + +public sealed class AggregateProgress(params IProgress[] progresses) : IProgress +{ + public void Report(T value) + { + foreach (var progress in progresses) + { + progress.Report(value); + } + } +} diff --git a/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManager.cs b/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManager.cs new file mode 100644 index 00000000..be141c65 --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManager.cs @@ -0,0 +1,89 @@ +using System.Diagnostics; +using Microsoft.Extensions.Logging; +using Speckle.InterfaceGenerator; +using Speckle.Sdk.Api; +using Speckle.Sdk.Api.GraphQL.Inputs; +using Speckle.Sdk.Api.GraphQL.Models; +using Speckle.Sdk.Helpers; + +namespace Speckle.Sdk.Pipelines.Progress; + +public partial interface IIngestionProgressManager : IProgress; + +/// +/// An implementation for the entire client side Ingestion progress update reporting +/// Will throttles ingestion progress messages and reports their progress +/// +[GenerateAutoInterface] +public sealed class IngestionProgressManager( + ILogger logger, + IClient speckleClient, + ModelIngestion ingestion, + string projectId, + TimeSpan updateInterval, + CancellationToken cancellationToken +) : IIngestionProgressManager +{ + /// + /// Normally we would pick quite a coarse throttle window to try and avoid over pressure (1-5s) + /// + private Task? _lastUpdate; + private long _lastUpdatedAt; + private readonly object _lock = new(); + + [AutoInterfaceIgnore] + public void Report(CardProgress value) + { + cancellationToken.ThrowIfCancellationRequested(); + + string trimmedMessage; + lock (_lock) + { + if (ShouldIgnoreProgressUpdate()) + { + return; + } + + _lastUpdatedAt = Stopwatch.GetTimestamp(); + + trimmedMessage = value.Status.TrimEnd('.'); + + _lastUpdate = speckleClient + .Ingestion.UpdateProgress( + new ModelIngestionUpdateInput(ingestion.id, projectId, trimmedMessage, value.Progress), + cancellationToken + ) + .ContinueWith( + HandleFaultedContinuation, + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default + ); + } + + logger.LogInformation("Progress update {Message} {Progress}", trimmedMessage, value.Progress); + } + + /// if the update should be ignored, otherwise + private bool ShouldIgnoreProgressUpdate() + { + if (_lastUpdate is not null && !_lastUpdate.IsCompleted) + { + return true; + } + + TimeSpan msSinceLastUpdate = StopwatchPolyfills.GetElapsedTime(_lastUpdatedAt); + return msSinceLastUpdate < updateInterval; + } + + private void HandleFaultedContinuation(Task updateTask) + { + // The progress report failed... could be many reasons. + // For now, we're not letting this fail the Ingestion in any way + // we'll log but otherwise let it slide while leaving no unobserved task exceptions + if (updateTask.IsFaulted) + { + logger.LogWarning(updateTask.Exception, "A progress update failed unexpectedly"); + } + } +} diff --git a/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManagerFactory.cs b/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManagerFactory.cs new file mode 100644 index 00000000..998431c2 --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Progress/IngestionProgressManagerFactory.cs @@ -0,0 +1,22 @@ +using Microsoft.Extensions.Logging; +using Speckle.InterfaceGenerator; +using Speckle.Sdk.Api; +using Speckle.Sdk.Api.GraphQL.Models; + +namespace Speckle.Sdk.Pipelines.Progress; + +[GenerateAutoInterface] +public sealed class IngestionProgressManagerFactory(ILogger logger) + : IIngestionProgressManagerFactory +{ + public IIngestionProgressManager CreateInstance( + IClient speckleClient, + ModelIngestion ingestion, + string projectId, + TimeSpan updateInterval, + CancellationToken cancellationToken + ) + { + return new IngestionProgressManager(logger, speckleClient, ingestion, projectId, updateInterval, cancellationToken); + } +} diff --git a/src/Speckle.Sdk/Pipelines/Progress/ProgressArgs.cs b/src/Speckle.Sdk/Pipelines/Progress/ProgressArgs.cs new file mode 100644 index 00000000..bfc47dbc --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Progress/ProgressArgs.cs @@ -0,0 +1,6 @@ +namespace Speckle.Sdk.Pipelines.Progress; + +//TODO: rename PipelineProgressArgs +public readonly record struct CardProgress(string Status, double? Progress); + +public readonly record struct StreamProgressArgs(long BytesStreamed, long ExpectedTotalBytes); diff --git a/src/Speckle.Sdk/Pipelines/ProgressStream.cs b/src/Speckle.Sdk/Pipelines/Progress/ProgressStream.cs similarity index 87% rename from src/Speckle.Sdk/Pipelines/ProgressStream.cs rename to src/Speckle.Sdk/Pipelines/Progress/ProgressStream.cs index b23998ae..f75c7837 100644 --- a/src/Speckle.Sdk/Pipelines/ProgressStream.cs +++ b/src/Speckle.Sdk/Pipelines/Progress/ProgressStream.cs @@ -1,23 +1,17 @@ using System.Diagnostics.CodeAnalysis; -namespace Speckle.Sdk.Pipelines; - -public readonly record struct StreamProgressArgs(long BytesStreamed, long ExpectedTotalBytes); +namespace Speckle.Sdk.Pipelines.Progress; /// -/// Wraps a stream to report upload progress as bytes are read. +/// Wraps to report streaming progress as bytes are read/written. /// -public sealed class ProgressStream( - Stream innerStream, - long expectedTotalBytesStreamed, - IProgress? progress = null -) : Stream +public sealed class ProgressStream(Stream innerStream, IProgress? progress = null) : Stream { private long _bytesStreamed; public override bool CanRead => innerStream.CanRead; public override bool CanSeek => innerStream.CanSeek; - public override bool CanWrite => false; + public override bool CanWrite => innerStream.CanWrite; public override long Length => innerStream.Length; public override long Position @@ -57,7 +51,7 @@ public sealed class ProgressStream( private void ReportProgress(int newBytesProcessed) { _bytesStreamed += newBytesProcessed; - progress?.Report(new(_bytesStreamed, expectedTotalBytesStreamed)); + progress?.Report(new(_bytesStreamed, Length)); } public override void Flush() => innerStream.Flush(); diff --git a/src/Speckle.Sdk/Pipelines/Progress/RenderedStreamProgress.cs b/src/Speckle.Sdk/Pipelines/Progress/RenderedStreamProgress.cs new file mode 100644 index 00000000..81f41d6c --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Progress/RenderedStreamProgress.cs @@ -0,0 +1,40 @@ +namespace Speckle.Sdk.Pipelines.Progress; + +/// +/// Renders "low level" data stream updates +/// into "high level" that is expected by Ingestion progress and DUI3 +/// +/// +public sealed class RenderedStreamProgress(IProgress progress) : IProgress +{ + public void Report(StreamProgressArgs value) + { + var (suffix, scaleFactor) = GetFileSizeRendering(value.ExpectedTotalBytes); + progress.Report( + new( + $"Uploading data... ({value.BytesStreamed * scaleFactor:F1}/{value.ExpectedTotalBytes * scaleFactor:F1} {suffix})", + (double)value.BytesStreamed / value.ExpectedTotalBytes + ) + ); + } + + private static readonly string[] s_suffixes = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; + + private static (string suffix, double scaleFactor) GetFileSizeRendering(long value) + { + if (value <= 0) + { + return (s_suffixes[0], 1d); + } + + for (int i = 0; i < s_suffixes.Length; i++) + { + if (value <= Math.Pow(1024, i + 1)) + { + return (s_suffixes[i], 1 / Math.Pow(1024, i)); + } + } + + throw new ArgumentOutOfRangeException(nameof(value), "Value is too large to convert to a file size"); + } +} diff --git a/src/Speckle.Sdk/Pipelines/Send/DiskStore.cs b/src/Speckle.Sdk/Pipelines/Send/DiskStore.cs new file mode 100644 index 00000000..0ac685fb --- /dev/null +++ b/src/Speckle.Sdk/Pipelines/Send/DiskStore.cs @@ -0,0 +1,77 @@ +using System.IO.Compression; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using Speckle.InterfaceGenerator; +using Speckle.Sdk.Helpers; + +namespace Speckle.Sdk.Pipelines.Send; + +[GenerateAutoInterface] +public sealed class DiskStoreFactory(ILogger logger) : IDiskStoreFactory +{ + public DiskStore CreateInstance(CancellationToken cancellationToken) => new(logger, cancellationToken); +} + +public sealed class DiskStore +{ + private readonly Channel _channel; + private readonly Task _writeToDiskTask; + private readonly ILogger _logger; + private readonly CancellationToken _cancellationToken; + + internal DiskStore(ILogger logger, CancellationToken cancellationToken) + { + _logger = logger; + _cancellationToken = cancellationToken; + + _channel = Channel.CreateBounded( + new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true } + ); + _writeToDiskTask = Task.Run(WriteFile, cancellationToken); + } + + public ValueTask PushAsync(UploadItem item) => _channel.Writer.WriteAsync(item, _cancellationToken); + + public async Task CompleteAsync() + { + _channel.Writer.Complete(); + return await _writeToDiskTask.ConfigureAwait(false); + } + + /// + /// Reads from the Channel and streams the s to a temporary file on disk. + /// Will keep reading until is called. + /// + /// the file that was written + private async Task WriteFile() + { + string tempFilePath = Path.GetTempFileName(); + var tempFile = new DisposableFile(new FileInfo(tempFilePath), _logger); + _logger.LogInformation("Writing temp file to {TempFilePath}", tempFilePath); + + try + { + using var fileStream = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None); + using var gzip = new GZipStream(fileStream, CompressionLevel.Optimal); + using var writer = new StreamWriter(gzip); + + await foreach (var item in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + { + await writer.WriteLineAsync($"{item.Id}\t{item.Json}\t{item.SpeckleType}").ConfigureAwait(false); + } +#if NET8_0_OR_GREATER + await writer.FlushAsync(_cancellationToken).ConfigureAwait(false); +#else + await writer.FlushAsync().ConfigureAwait(false); +#endif + tempFile.FileInfo.Refresh(); + + return tempFile; + } + catch + { + tempFile.Dispose(); + throw; + } + } +} diff --git a/src/Speckle.Sdk/Pipelines/Send/SendPipeline.cs b/src/Speckle.Sdk/Pipelines/Send/SendPipeline.cs index e1a97a40..e00439ff 100644 --- a/src/Speckle.Sdk/Pipelines/Send/SendPipeline.cs +++ b/src/Speckle.Sdk/Pipelines/Send/SendPipeline.cs @@ -1,21 +1,26 @@ using Speckle.InterfaceGenerator; using Speckle.Sdk.Credentials; +using Speckle.Sdk.Helpers; using Speckle.Sdk.Models; +using Speckle.Sdk.Pipelines.Progress; namespace Speckle.Sdk.Pipelines.Send; [GenerateAutoInterface] -public sealed class SendPipelineFactory(IUploaderFactory uploaderFactory) : ISendPipelineFactory +public sealed class SendPipelineFactory(IUploaderFactory uploaderFactory, IDiskStoreFactory diskStoreFactory) + : ISendPipelineFactory { public SendPipeline CreateInstance( string projectId, string ingestionId, Account account, + IProgress uploadProgress, CancellationToken cancellationToken ) { - var uploader = uploaderFactory.CreateInstance(projectId, ingestionId, account, cancellationToken); - return new SendPipeline(uploader); + var uploader = uploaderFactory.CreateInstance(projectId, ingestionId, account, uploadProgress, cancellationToken); + var diskStore = diskStoreFactory.CreateInstance(cancellationToken); + return new SendPipeline(uploader, diskStore); } } @@ -23,10 +28,12 @@ public sealed class SendPipeline : IDisposable { private readonly Serializer _serializer = new(); private readonly Uploader _uploader; + private readonly DiskStore _diskStore; - internal SendPipeline(Uploader uploader) + internal SendPipeline(Uploader uploader, DiskStore diskStore) { _uploader = uploader; + _diskStore = diskStore; } private UploadItem _lastItem; @@ -38,7 +45,7 @@ public sealed class SendPipeline : IDisposable foreach (var item in results) { // we're not doing fire and forget here so that we get the backpressure from the uploader - await _uploader.PushAsync(item).ConfigureAwait(false); + await _diskStore.PushAsync(item).ConfigureAwait(false); } // NOTE: this is important to keep track of. When we serialze an object, we get back a list of objects, with the first one being the original root. @@ -50,8 +57,17 @@ public sealed class SendPipeline : IDisposable public async Task WaitForUpload() { - await _uploader.PushAsync(_lastItem).ConfigureAwait(false); - await _uploader.CompleteAsync().ConfigureAwait(false); + await _diskStore.PushAsync(_lastItem).ConfigureAwait(false); + using DisposableFile tempFile = await _diskStore.CompleteAsync().ConfigureAwait(false); + + using Stream fileStreamUpload = new FileStream( + tempFile.FileInfo.FullName, + FileMode.Open, + FileAccess.Read, + FileShare.Read + ); + + await _uploader.Send(fileStreamUpload).ConfigureAwait(false); } public async Task WaitForUploadAndServerProcessing() diff --git a/src/Speckle.Sdk/Pipelines/Send/Uploader.cs b/src/Speckle.Sdk/Pipelines/Send/Uploader.cs index 1cd96f13..63a17881 100644 --- a/src/Speckle.Sdk/Pipelines/Send/Uploader.cs +++ b/src/Speckle.Sdk/Pipelines/Send/Uploader.cs @@ -1,11 +1,10 @@ -using System.IO.Compression; using System.Net.Http.Headers; -using System.Threading.Channels; using Microsoft.Extensions.Logging; using Speckle.InterfaceGenerator; using Speckle.Newtonsoft.Json; using Speckle.Sdk.Credentials; using Speckle.Sdk.Helpers; +using Speckle.Sdk.Pipelines.Progress; namespace Speckle.Sdk.Pipelines.Send; @@ -16,8 +15,9 @@ public sealed class UploaderFactory(ISpeckleHttp httpClientFactory, ILogger progress, CancellationToken cancellationToken - ) => new(projectId, ingestionId, logger, httpClientFactory, account, cancellationToken); + ) => new(projectId, ingestionId, logger, httpClientFactory, account, progress, cancellationToken); } public sealed class Uploader : IDisposable @@ -27,9 +27,8 @@ public sealed class Uploader : IDisposable private readonly CancellationToken _cancellationToken; private readonly HttpClient _speckleClient; private readonly HttpClient _s3Client; - private readonly Channel _channel; - private readonly Task _sendTask; private readonly ILogger _logger; + private readonly IProgress _progress; internal Uploader( string projectId, @@ -37,6 +36,7 @@ public sealed class Uploader : IDisposable ILogger logger, ISpeckleHttp httpClientFactory, Account speckleAccount, + IProgress progress, CancellationToken cancellationToken ) { @@ -44,65 +44,19 @@ public sealed class Uploader : IDisposable _ingestionId = ingestionId; _logger = logger; _cancellationToken = cancellationToken; - - _speckleClient = httpClientFactory.CreateHttpClient( - null, - (int)TimeSpan.FromMinutes(30).TotalSeconds, - speckleAccount.token - ); + _progress = progress; + _speckleClient = httpClientFactory.CreateHttpClient(authorizationToken: speckleAccount.token); _speckleClient.BaseAddress = new(new(speckleAccount.serverInfo.url), "/api/v1/"); _s3Client = httpClientFactory.CreateHttpClient(); - - _channel = Channel.CreateBounded( - new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true } - ); - - _sendTask = Task.Run(SendLoopAsync, cancellationToken); } - public ValueTask PushAsync(UploadItem item) => _channel.Writer.WriteAsync(item, _cancellationToken); - - public async Task CompleteAsync() + public async Task Send(Stream fileStream) { - _channel.Writer.Complete(); - var result = await _sendTask.ConfigureAwait(false); - return result.IngestionId; - } - - private async Task SendLoopAsync() - { - using DisposableFile tempFile = await WriteFile().ConfigureAwait(false); - PresignedUploadResponse presignedUploadResponse = await GetPresignedUrl().ConfigureAwait(false); - await UploadToS3(tempFile.FileInfo, presignedUploadResponse).ConfigureAwait(false); + await UploadToS3(fileStream, presignedUploadResponse).ConfigureAwait(false); - return await TriggerProcessing().ConfigureAwait(false); - } - - /// - /// Reads from the Channel and streams the s to a temporary file on disk. - /// Will keep reading until is called. - /// - /// the file that was written - private async Task WriteFile() - { - string tempFilePath = Path.GetTempFileName(); - _logger.LogInformation("Writing temp file to {TempFilePath}", tempFilePath); - - using var fileStream = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None); - using var gzip = new GZipStream(fileStream, CompressionLevel.Optimal); - using var writer = new StreamWriter(gzip); - await foreach (var item in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) - { - await writer.WriteLineAsync($"{item.Id}\t{item.Json}\t{item.SpeckleType}").ConfigureAwait(false); - } -#if NET8_0_OR_GREATER - await writer.FlushAsync(_cancellationToken).ConfigureAwait(false); -#else - await writer.FlushAsync().ConfigureAwait(false); -#endif - return new DisposableFile(new FileInfo(tempFilePath), _logger); + await TriggerProcessing().ConfigureAwait(false); } private async Task GetPresignedUrl() @@ -123,15 +77,15 @@ public sealed class Uploader : IDisposable return presignedUpload; } - private async Task UploadToS3(FileInfo file, PresignedUploadResponse presignedUploadResponse) + private async Task UploadToS3(Stream fileStream, PresignedUploadResponse presignedUploadResponse) { - using var fileStreamUpload = new FileStream(file.FullName, FileMode.Open, FileAccess.Read, FileShare.Read); + _logger.LogInformation("Uploading file to pre-signed url"); - Stream progressStream = fileStreamUpload; // TODO: wrap with progress stream + Stream progressStream = new ProgressStream(fileStream, _progress); using var streamContent = new StreamContent(progressStream); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); - streamContent.Headers.ContentLength = file.Length; + streamContent.Headers.ContentLength = fileStream.Length; using var uploadRequest = new HttpRequestMessage(HttpMethod.Put, presignedUploadResponse.Url); foreach (var kvp in presignedUploadResponse.AdditionalRequestHeaders) @@ -148,7 +102,7 @@ public sealed class Uploader : IDisposable uploadResponse.EnsureSuccessStatusCode(); } - private async Task TriggerProcessing() + private async Task TriggerProcessing() { Uri processUri = new($"projects/{_projectId}/modelingestion/{_ingestionId}/uploads/process", UriKind.Relative); @@ -157,8 +111,6 @@ public sealed class Uploader : IDisposable .ConfigureAwait(false); processResponse.EnsureSuccessStatusCode(); - - return new UploadResult { IngestionId = _ingestionId }; } public void Dispose() diff --git a/src/Speckle.Sdk/Pipelines/Send/UploaderDTOs.cs b/src/Speckle.Sdk/Pipelines/Send/UploaderDTOs.cs index cff5775b..8ff9a546 100644 --- a/src/Speckle.Sdk/Pipelines/Send/UploaderDTOs.cs +++ b/src/Speckle.Sdk/Pipelines/Send/UploaderDTOs.cs @@ -16,8 +16,3 @@ internal record ProcessUploadResponse { public required string ingestionId { get; init; } } - -internal record UploadResult -{ - public required string IngestionId { get; init; } -} diff --git a/src/Speckle.Sdk/ServiceRegistration.cs b/src/Speckle.Sdk/ServiceRegistration.cs index 94e147cc..9028ae7c 100644 --- a/src/Speckle.Sdk/ServiceRegistration.cs +++ b/src/Speckle.Sdk/ServiceRegistration.cs @@ -8,6 +8,7 @@ using Speckle.Sdk.Dependencies; using Speckle.Sdk.Host; using Speckle.Sdk.Logging; using Speckle.Sdk.Models.GraphTraversal; +using Speckle.Sdk.Pipelines.Progress; using Speckle.Sdk.Serialisation.V2; using Speckle.Sdk.Serialisation.V2.Receive; using Speckle.Sdk.Serialisation.V2.Send; @@ -96,7 +97,8 @@ public static class ServiceRegistration typeof(DeserializeProcess), typeof(ObjectLoader), typeof(TraversalRule), - typeof(Client) + typeof(Client), + typeof(IngestionProgressManager) ); serviceCollection.AddMatchingInterfacesAsTransient(typeof(GraphQLRetry).Assembly); return serviceCollection;