feat(progress): Add progress reporting to uploader (#446)
.NET Build and Publish / build (push) Has been cancelled

* refactor uploader for progress

* progress

* not so many decimal places

* small tweak to RenderStreamProgress

* fix unit tests

* uploading data
This commit is contained in:
Jedd Morgan
2026-02-26 10:45:48 +00:00
committed by GitHub
parent 9bf6995b15
commit 070f21b075
12 changed files with 313 additions and 87 deletions
@@ -0,0 +1,21 @@
using System.Diagnostics;
namespace Speckle.Sdk.Helpers;
public static class StopwatchPolyfills
{
#if !NET7_0_OR_GREATER
private static readonly double s_tickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;
#endif
public static TimeSpan GetElapsedTime(long startingTimestamp)
{
#if NET7_0_OR_GREATER
return Stopwatch.GetElapsedTime(startingTimestamp);
#else
long elapsedTicks = Stopwatch.GetTimestamp() - startingTimestamp;
return new TimeSpan((long)(elapsedTicks * s_tickFrequency));
#endif
}
}
@@ -0,0 +1,12 @@
namespace Speckle.Sdk.Pipelines.Progress;
public sealed class AggregateProgress<T>(params IProgress<T>[] progresses) : IProgress<T>
{
public void Report(T value)
{
foreach (var progress in progresses)
{
progress.Report(value);
}
}
}
@@ -0,0 +1,89 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Api;
using Speckle.Sdk.Api.GraphQL.Inputs;
using Speckle.Sdk.Api.GraphQL.Models;
using Speckle.Sdk.Helpers;
namespace Speckle.Sdk.Pipelines.Progress;
public partial interface IIngestionProgressManager : IProgress<CardProgress>;
/// <summary>
/// An <see langword="IProgress{IngestionProgressEventArgs}"/> implementation for the entire client side Ingestion progress update reporting
/// Will throttles ingestion progress messages and reports their progress
/// </summary>
[GenerateAutoInterface]
public sealed class IngestionProgressManager(
ILogger<IngestionProgressManager> logger,
IClient speckleClient,
ModelIngestion ingestion,
string projectId,
TimeSpan updateInterval,
CancellationToken cancellationToken
) : IIngestionProgressManager
{
/// <remarks>
/// Normally we would pick quite a coarse throttle window to try and avoid over pressure (1-5s)
/// </remarks>
private Task? _lastUpdate;
private long _lastUpdatedAt;
private readonly object _lock = new();
[AutoInterfaceIgnore]
public void Report(CardProgress value)
{
cancellationToken.ThrowIfCancellationRequested();
string trimmedMessage;
lock (_lock)
{
if (ShouldIgnoreProgressUpdate())
{
return;
}
_lastUpdatedAt = Stopwatch.GetTimestamp();
trimmedMessage = value.Status.TrimEnd('.');
_lastUpdate = speckleClient
.Ingestion.UpdateProgress(
new ModelIngestionUpdateInput(ingestion.id, projectId, trimmedMessage, value.Progress),
cancellationToken
)
.ContinueWith(
HandleFaultedContinuation,
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default
);
}
logger.LogInformation("Progress update {Message} {Progress}", trimmedMessage, value.Progress);
}
/// <returns><see langword="true"/> if the update should be ignored, otherwise <see langword="false"/></returns>
private bool ShouldIgnoreProgressUpdate()
{
if (_lastUpdate is not null && !_lastUpdate.IsCompleted)
{
return true;
}
TimeSpan msSinceLastUpdate = StopwatchPolyfills.GetElapsedTime(_lastUpdatedAt);
return msSinceLastUpdate < updateInterval;
}
private void HandleFaultedContinuation(Task updateTask)
{
// The progress report failed... could be many reasons.
// For now, we're not letting this fail the Ingestion in any way
// we'll log but otherwise let it slide while leaving no unobserved task exceptions
if (updateTask.IsFaulted)
{
logger.LogWarning(updateTask.Exception, "A progress update failed unexpectedly");
}
}
}
@@ -0,0 +1,22 @@
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Api;
using Speckle.Sdk.Api.GraphQL.Models;
namespace Speckle.Sdk.Pipelines.Progress;
[GenerateAutoInterface]
public sealed class IngestionProgressManagerFactory(ILogger<IngestionProgressManager> logger)
: IIngestionProgressManagerFactory
{
public IIngestionProgressManager CreateInstance(
IClient speckleClient,
ModelIngestion ingestion,
string projectId,
TimeSpan updateInterval,
CancellationToken cancellationToken
)
{
return new IngestionProgressManager(logger, speckleClient, ingestion, projectId, updateInterval, cancellationToken);
}
}
@@ -0,0 +1,6 @@
namespace Speckle.Sdk.Pipelines.Progress;
//TODO: rename PipelineProgressArgs
public readonly record struct CardProgress(string Status, double? Progress);
public readonly record struct StreamProgressArgs(long BytesStreamed, long ExpectedTotalBytes);
@@ -1,23 +1,17 @@
using System.Diagnostics.CodeAnalysis;
namespace Speckle.Sdk.Pipelines;
public readonly record struct StreamProgressArgs(long BytesStreamed, long ExpectedTotalBytes);
namespace Speckle.Sdk.Pipelines.Progress;
/// <summary>
/// Wraps a stream to report upload progress as bytes are read.
/// Wraps <paramref name="innerStream"/> to report streaming progress as bytes are read/written.
/// </summary>
public sealed class ProgressStream(
Stream innerStream,
long expectedTotalBytesStreamed,
IProgress<StreamProgressArgs>? progress = null
) : Stream
public sealed class ProgressStream(Stream innerStream, IProgress<StreamProgressArgs>? progress = null) : Stream
{
private long _bytesStreamed;
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => false;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position
@@ -57,7 +51,7 @@ public sealed class ProgressStream(
private void ReportProgress(int newBytesProcessed)
{
_bytesStreamed += newBytesProcessed;
progress?.Report(new(_bytesStreamed, expectedTotalBytesStreamed));
progress?.Report(new(_bytesStreamed, Length));
}
public override void Flush() => innerStream.Flush();
@@ -0,0 +1,40 @@
namespace Speckle.Sdk.Pipelines.Progress;
/// <summary>
/// Renders "low level" data stream updates
/// into "high level" <see cref="CardProgress"/> that is expected by Ingestion progress and DUI3
/// </summary>
/// <param name="progress"></param>
public sealed class RenderedStreamProgress(IProgress<CardProgress> progress) : IProgress<StreamProgressArgs>
{
public void Report(StreamProgressArgs value)
{
var (suffix, scaleFactor) = GetFileSizeRendering(value.ExpectedTotalBytes);
progress.Report(
new(
$"Uploading data... ({value.BytesStreamed * scaleFactor:F1}/{value.ExpectedTotalBytes * scaleFactor:F1} {suffix})",
(double)value.BytesStreamed / value.ExpectedTotalBytes
)
);
}
private static readonly string[] s_suffixes = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"];
private static (string suffix, double scaleFactor) GetFileSizeRendering(long value)
{
if (value <= 0)
{
return (s_suffixes[0], 1d);
}
for (int i = 0; i < s_suffixes.Length; i++)
{
if (value <= Math.Pow(1024, i + 1))
{
return (s_suffixes[i], 1 / Math.Pow(1024, i));
}
}
throw new ArgumentOutOfRangeException(nameof(value), "Value is too large to convert to a file size");
}
}
@@ -0,0 +1,77 @@
using System.IO.Compression;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
namespace Speckle.Sdk.Pipelines.Send;
[GenerateAutoInterface]
public sealed class DiskStoreFactory(ILogger<DiskStore> logger) : IDiskStoreFactory
{
public DiskStore CreateInstance(CancellationToken cancellationToken) => new(logger, cancellationToken);
}
public sealed class DiskStore
{
private readonly Channel<UploadItem> _channel;
private readonly Task<DisposableFile> _writeToDiskTask;
private readonly ILogger<DiskStore> _logger;
private readonly CancellationToken _cancellationToken;
internal DiskStore(ILogger<DiskStore> logger, CancellationToken cancellationToken)
{
_logger = logger;
_cancellationToken = cancellationToken;
_channel = Channel.CreateBounded<UploadItem>(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true }
);
_writeToDiskTask = Task.Run(WriteFile, cancellationToken);
}
public ValueTask PushAsync(UploadItem item) => _channel.Writer.WriteAsync(item, _cancellationToken);
public async Task<DisposableFile> CompleteAsync()
{
_channel.Writer.Complete();
return await _writeToDiskTask.ConfigureAwait(false);
}
/// <summary>
/// Reads from the Channel and streams the <see cref="UploadItem"/>s to a temporary file on disk.
/// Will keep reading until <see cref="CompleteAsync"/> is called.
/// </summary>
/// <returns>the file that was written</returns>
private async Task<DisposableFile> WriteFile()
{
string tempFilePath = Path.GetTempFileName();
var tempFile = new DisposableFile(new FileInfo(tempFilePath), _logger);
_logger.LogInformation("Writing temp file to {TempFilePath}", tempFilePath);
try
{
using var fileStream = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None);
using var gzip = new GZipStream(fileStream, CompressionLevel.Optimal);
using var writer = new StreamWriter(gzip);
await foreach (var item in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
{
await writer.WriteLineAsync($"{item.Id}\t{item.Json}\t{item.SpeckleType}").ConfigureAwait(false);
}
#if NET8_0_OR_GREATER
await writer.FlushAsync(_cancellationToken).ConfigureAwait(false);
#else
await writer.FlushAsync().ConfigureAwait(false);
#endif
tempFile.FileInfo.Refresh();
return tempFile;
}
catch
{
tempFile.Dispose();
throw;
}
}
}
+23 -7
View File
@@ -1,21 +1,26 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Models;
using Speckle.Sdk.Pipelines.Progress;
namespace Speckle.Sdk.Pipelines.Send;
[GenerateAutoInterface]
public sealed class SendPipelineFactory(IUploaderFactory uploaderFactory) : ISendPipelineFactory
public sealed class SendPipelineFactory(IUploaderFactory uploaderFactory, IDiskStoreFactory diskStoreFactory)
: ISendPipelineFactory
{
public SendPipeline CreateInstance(
string projectId,
string ingestionId,
Account account,
IProgress<StreamProgressArgs> uploadProgress,
CancellationToken cancellationToken
)
{
var uploader = uploaderFactory.CreateInstance(projectId, ingestionId, account, cancellationToken);
return new SendPipeline(uploader);
var uploader = uploaderFactory.CreateInstance(projectId, ingestionId, account, uploadProgress, cancellationToken);
var diskStore = diskStoreFactory.CreateInstance(cancellationToken);
return new SendPipeline(uploader, diskStore);
}
}
@@ -23,10 +28,12 @@ public sealed class SendPipeline : IDisposable
{
private readonly Serializer _serializer = new();
private readonly Uploader _uploader;
private readonly DiskStore _diskStore;
internal SendPipeline(Uploader uploader)
internal SendPipeline(Uploader uploader, DiskStore diskStore)
{
_uploader = uploader;
_diskStore = diskStore;
}
private UploadItem _lastItem;
@@ -38,7 +45,7 @@ public sealed class SendPipeline : IDisposable
foreach (var item in results)
{
// we're not doing fire and forget here so that we get the backpressure from the uploader
await _uploader.PushAsync(item).ConfigureAwait(false);
await _diskStore.PushAsync(item).ConfigureAwait(false);
}
// NOTE: this is important to keep track of. When we serialze an object, we get back a list of objects, with the first one being the original root.
@@ -50,8 +57,17 @@ public sealed class SendPipeline : IDisposable
public async Task WaitForUpload()
{
await _uploader.PushAsync(_lastItem).ConfigureAwait(false);
await _uploader.CompleteAsync().ConfigureAwait(false);
await _diskStore.PushAsync(_lastItem).ConfigureAwait(false);
using DisposableFile tempFile = await _diskStore.CompleteAsync().ConfigureAwait(false);
using Stream fileStreamUpload = new FileStream(
tempFile.FileInfo.FullName,
FileMode.Open,
FileAccess.Read,
FileShare.Read
);
await _uploader.Send(fileStreamUpload).ConfigureAwait(false);
}
public async Task<string> WaitForUploadAndServerProcessing()
+15 -63
View File
@@ -1,11 +1,10 @@
using System.IO.Compression;
using System.Net.Http.Headers;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Newtonsoft.Json;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Pipelines.Progress;
namespace Speckle.Sdk.Pipelines.Send;
@@ -16,8 +15,9 @@ public sealed class UploaderFactory(ISpeckleHttp httpClientFactory, ILogger<Uplo
string projectId,
string ingestionId,
Account account,
IProgress<StreamProgressArgs> progress,
CancellationToken cancellationToken
) => new(projectId, ingestionId, logger, httpClientFactory, account, cancellationToken);
) => new(projectId, ingestionId, logger, httpClientFactory, account, progress, cancellationToken);
}
public sealed class Uploader : IDisposable
@@ -27,9 +27,8 @@ public sealed class Uploader : IDisposable
private readonly CancellationToken _cancellationToken;
private readonly HttpClient _speckleClient;
private readonly HttpClient _s3Client;
private readonly Channel<UploadItem> _channel;
private readonly Task<UploadResult> _sendTask;
private readonly ILogger<Uploader> _logger;
private readonly IProgress<StreamProgressArgs> _progress;
internal Uploader(
string projectId,
@@ -37,6 +36,7 @@ public sealed class Uploader : IDisposable
ILogger<Uploader> logger,
ISpeckleHttp httpClientFactory,
Account speckleAccount,
IProgress<StreamProgressArgs> progress,
CancellationToken cancellationToken
)
{
@@ -44,65 +44,19 @@ public sealed class Uploader : IDisposable
_ingestionId = ingestionId;
_logger = logger;
_cancellationToken = cancellationToken;
_speckleClient = httpClientFactory.CreateHttpClient(
null,
(int)TimeSpan.FromMinutes(30).TotalSeconds,
speckleAccount.token
);
_progress = progress;
_speckleClient = httpClientFactory.CreateHttpClient(authorizationToken: speckleAccount.token);
_speckleClient.BaseAddress = new(new(speckleAccount.serverInfo.url), "/api/v1/");
_s3Client = httpClientFactory.CreateHttpClient();
_channel = Channel.CreateBounded<UploadItem>(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true }
);
_sendTask = Task.Run(SendLoopAsync, cancellationToken);
}
public ValueTask PushAsync(UploadItem item) => _channel.Writer.WriteAsync(item, _cancellationToken);
public async Task<string> CompleteAsync()
public async Task Send(Stream fileStream)
{
_channel.Writer.Complete();
var result = await _sendTask.ConfigureAwait(false);
return result.IngestionId;
}
private async Task<UploadResult> SendLoopAsync()
{
using DisposableFile tempFile = await WriteFile().ConfigureAwait(false);
PresignedUploadResponse presignedUploadResponse = await GetPresignedUrl().ConfigureAwait(false);
await UploadToS3(tempFile.FileInfo, presignedUploadResponse).ConfigureAwait(false);
await UploadToS3(fileStream, presignedUploadResponse).ConfigureAwait(false);
return await TriggerProcessing().ConfigureAwait(false);
}
/// <summary>
/// Reads from the Channel and streams the <see cref="UploadItem"/>s to a temporary file on disk.
/// Will keep reading until <see cref="CompleteAsync"/> is called.
/// </summary>
/// <returns>the file that was written</returns>
private async Task<DisposableFile> WriteFile()
{
string tempFilePath = Path.GetTempFileName();
_logger.LogInformation("Writing temp file to {TempFilePath}", tempFilePath);
using var fileStream = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None);
using var gzip = new GZipStream(fileStream, CompressionLevel.Optimal);
using var writer = new StreamWriter(gzip);
await foreach (var item in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
{
await writer.WriteLineAsync($"{item.Id}\t{item.Json}\t{item.SpeckleType}").ConfigureAwait(false);
}
#if NET8_0_OR_GREATER
await writer.FlushAsync(_cancellationToken).ConfigureAwait(false);
#else
await writer.FlushAsync().ConfigureAwait(false);
#endif
return new DisposableFile(new FileInfo(tempFilePath), _logger);
await TriggerProcessing().ConfigureAwait(false);
}
private async Task<PresignedUploadResponse> GetPresignedUrl()
@@ -123,15 +77,15 @@ public sealed class Uploader : IDisposable
return presignedUpload;
}
private async Task UploadToS3(FileInfo file, PresignedUploadResponse presignedUploadResponse)
private async Task UploadToS3(Stream fileStream, PresignedUploadResponse presignedUploadResponse)
{
using var fileStreamUpload = new FileStream(file.FullName, FileMode.Open, FileAccess.Read, FileShare.Read);
_logger.LogInformation("Uploading file to pre-signed url");
Stream progressStream = fileStreamUpload; // TODO: wrap with progress stream
Stream progressStream = new ProgressStream(fileStream, _progress);
using var streamContent = new StreamContent(progressStream);
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
streamContent.Headers.ContentLength = file.Length;
streamContent.Headers.ContentLength = fileStream.Length;
using var uploadRequest = new HttpRequestMessage(HttpMethod.Put, presignedUploadResponse.Url);
foreach (var kvp in presignedUploadResponse.AdditionalRequestHeaders)
@@ -148,7 +102,7 @@ public sealed class Uploader : IDisposable
uploadResponse.EnsureSuccessStatusCode();
}
private async Task<UploadResult> TriggerProcessing()
private async Task TriggerProcessing()
{
Uri processUri = new($"projects/{_projectId}/modelingestion/{_ingestionId}/uploads/process", UriKind.Relative);
@@ -157,8 +111,6 @@ public sealed class Uploader : IDisposable
.ConfigureAwait(false);
processResponse.EnsureSuccessStatusCode();
return new UploadResult { IngestionId = _ingestionId };
}
public void Dispose()
@@ -16,8 +16,3 @@ internal record ProcessUploadResponse
{
public required string ingestionId { get; init; }
}
internal record UploadResult
{
public required string IngestionId { get; init; }
}
+3 -1
View File
@@ -8,6 +8,7 @@ using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Host;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models.GraphTraversal;
using Speckle.Sdk.Pipelines.Progress;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Serialisation.V2.Send;
@@ -96,7 +97,8 @@ public static class ServiceRegistration
typeof(DeserializeProcess),
typeof(ObjectLoader),
typeof(TraversalRule),
typeof(Client)
typeof(Client),
typeof(IngestionProgressManager)
);
serviceCollection.AddMatchingInterfacesAsTransient(typeof(GraphQLRetry).Assembly);
return serviceCollection;