blobs-poc

This commit is contained in:
JR-Morgan
2025-11-13 11:01:26 +03:00
parent b0da4510bf
commit 94d2a01880
10 changed files with 300 additions and 95 deletions
@@ -0,0 +1,41 @@
using System.Threading.Channels;
namespace Speckle.Sdk.Dependencies;
internal sealed class BroadcastChannel<T>
{
private readonly List<Channel<T>> _subscribers = [];
public ChannelReader<T> Subscribe()
{
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions() { SingleReader = true });
_subscribers.Add(channel);
return channel.Reader;
}
public async Task WriteAsync(T item, CancellationToken cancellationToken)
{
foreach (var sub in _subscribers)
{
await sub.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
public bool IsReadingCompleted()
{
return _subscribers.All(x => x.Reader.Completion.IsCompleted);
}
public void CompleteWriters()
{
foreach (var sub in _subscribers)
{
sub.Writer.Complete();
}
}
public async Task CompleteReaders()
{
await Task.WhenAll(_subscribers.Select(x => x.Reader.Completion)).ConfigureAwait(false);
}
}
@@ -6,28 +6,23 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed class Batch<T> : IMemoryOwner<T>
where T : IHasByteSize
{
private static readonly Pool<List<T>> _pool = Pools.CreateListPool<T>();
#pragma warning disable IDE0032
private readonly List<T> _items = _pool.Get();
private int _batchByteSize;
#pragma warning restore IDE0032
private static readonly Pool<List<T>> s_pool = Pools.CreateListPool<T>();
public List<T> Items { get; } = s_pool.Get();
public int BatchByteSize { get; private set; }
public void Add(T item)
{
_items.Add(item);
_batchByteSize += item.ByteSize;
Items.Add(item);
BatchByteSize += item.ByteSize;
}
public void TrimExcess()
{
_items.TrimExcess();
_batchByteSize = _items.Sum(x => x.ByteSize);
Items.TrimExcess();
BatchByteSize = Items.Sum(x => x.ByteSize);
}
public int BatchByteSize => _batchByteSize;
public List<T> Items => _items;
public void Dispose() => s_pool.Return(Items);
public void Dispose() => _pool.Return(_items);
public Memory<T> Memory => new(_items.ToArray());
public Memory<T> Memory => new(Items.ToArray());
}
@@ -1,74 +1,134 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;
namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelSaver<T>
where T : IHasByteSize
public abstract class ChannelSaver<TItem, TBlobItem>
where TItem : IHasByteSize
where TBlobItem : IHasByteSize, TItem
{
private const int SEND_CAPACITY = 10000;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private const int BLOB_SEND_CHUNK_SIZE = 10; //count
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4;
private const int HTTP_CAPACITY = 500;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 1000;
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
AllowSynchronousContinuations = true,
Capacity = SEND_CAPACITY,
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait,
},
_ => throw new NotImplementedException("Dropping items not supported.")
);
private readonly BroadcastChannel<TItem> _broadcastChannel = new();
public Task Start(
public async Task Start(
int? maxParallelism,
int? httpBatchSize,
int? blobSendCache,
int? cacheBatchSize,
CancellationToken cancellationToken
) =>
_checkCacheChannel
.Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY,
false,
)
{
maxParallelism ??= MAX_PARALLELISM_HTTP;
httpBatchSize ??= HTTP_SEND_CHUNK_SIZE;
blobSendCache ??= BLOB_SEND_CHUNK_SIZE;
cacheBatchSize ??= MAX_CACHE_BATCH;
await StartInternal(
maxParallelism.Value,
httpBatchSize.Value,
blobSendCache.Value,
cacheBatchSize.Value,
cancellationToken
)
.Join()
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith(
t =>
{
Exception? ex = t.Exception;
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
{
ex = new OperationCanceledException();
}
.ConfigureAwait(false);
}
if (ex is not null)
{
RecordException(ex);
}
_checkCacheChannel.Writer.TryComplete(ex);
},
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Current
private Task StartInternal(
int maxParallelism,
int httpBatchSize,
int blobSendCache,
int cacheBatchSize,
CancellationToken cancellationToken
)
{
Task serverSend = _broadcastChannel
.Subscribe()
.BatchByByteSize(httpBatchSize)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrentlyAsync(
maxParallelism,
async x => await SendToServer(x).ConfigureAwait(false),
cancellationToken
);
public async Task SaveAsync(T item, CancellationToken cancellationToken)
Task writeCache = _broadcastChannel
.Subscribe()
.Batch(cacheBatchSize)
.ReadAll(SaveToCache, true, cancellationToken: cancellationToken)
.AsTask();
Task blobsCache = _broadcastChannel
.Subscribe()
.OfType<TItem, TBlobItem>()
.BatchByByteSize(blobSendCache)
.ReadAllAsync(
async x => await SendBlobToServer(x).ConfigureAwait(false),
true,
cancellationToken: cancellationToken
)
.AsTask();
return Task.WhenAll(serverSend, writeCache, blobsCache);
// return _broadcastChannel
// .Subscribe()
// .BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
// .WithTimeout(HTTP_BATCH_TIMEOUT)
// .PipeAsync(
// maxParallelism ?? MAX_PARALLELISM_HTTP,
// async x => await SendToServer(x).ConfigureAwait(false),
// HTTP_CAPACITY,
// false,
// cancellationToken
// )
// .Join()
// .Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true)
// .WithTimeout(HTTP_BATCH_TIMEOUT)
// .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
// .ContinueWith(
// t =>
// {
// Exception? ex = t.Exception;
// if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
// {
// ex = new OperationCanceledException();
// }
//
// if (ex is not null)
// {
// RecordException(ex);
// }
//
// _checkCacheChannel.Writer.TryComplete(ex);
// },
// cancellationToken,
// TaskContinuationOptions.ExecuteSynchronously,
// TaskScheduler.Current
// );
}
private async ValueTask SendBlobToServer(IMemoryOwner<TBlobItem> batch)
{
try
{
await SendBlobToServerInternal((Batch<TBlobItem>)batch).ConfigureAwait(false);
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
RecordException(ex);
}
}
protected abstract Task SendBlobToServerInternal(Batch<TBlobItem> batch);
public async Task SaveAsync(TItem item, CancellationToken cancellationToken)
{
if (Exception is not null)
{
@@ -76,36 +136,34 @@ public abstract class ChannelSaver<T>
}
//can switch to check then try pattern when back pressure is needed or exceptions are too much
//the trees don't need to respond to back pressure
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
await _broadcastChannel.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
private async Task SendToServer(IMemoryOwner<TItem> batch)
{
try
{
await SendToServerInternal((Batch<T>)batch).ConfigureAwait(false);
return batch;
await SendToServerInternal((Batch<TItem>)batch).ConfigureAwait(false);
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
RecordException(ex);
return batch;
}
}
protected abstract Task SendToServerInternal(Batch<T> batch);
protected abstract Task SendToServerInternal(Batch<TItem> batch);
public abstract void SaveToCache(List<T> item);
public abstract void SaveToCache(List<TItem> item);
public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();
public void DoneTraversing() => _broadcastChannel.CompleteWriters();
public async Task DoneSaving()
{
if (!_checkCacheChannel.Reader.Completion.IsCompleted)
if (!_broadcastChannel.IsReadingCompleted())
{
await _checkCacheChannel.Reader.Completion.ConfigureAwait(false);
await _broadcastChannel.CompleteReaders().ConfigureAwait(false);
}
}
@@ -114,6 +172,5 @@ public abstract class ChannelSaver<T>
private void RecordException(Exception ex)
{
Exception = ex;
_checkCacheChannel.Writer.TryComplete(ex);
}
}
+12 -8
View File
@@ -1,38 +1,42 @@
using System.Runtime.Serialization;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.Serialization;
using Speckle.Newtonsoft.Json;
namespace Speckle.Sdk.Models;
[SpeckleType("Speckle.Core.Models.Blob")]
public class Blob : Base
public sealed class Blob : Base
{
[JsonIgnore]
public static int LocalHashPrefixLength => 20;
private string _filePath;
private string _hash;
private string? _hash;
private bool _isHashExpired = true;
public Blob() { }
[SetsRequiredMembers]
public Blob(string filePath)
{
this.filePath = filePath;
this.originalPath = filePath;
}
public string filePath
public required string filePath
{
get => _filePath;
set
{
originalPath ??= value;
_filePath = value;
_isHashExpired = true;
}
}
public string originalPath { get; set; }
public required string originalPath { get; set; }
[JsonIgnore]
public FileInfo FileInfo => new(filePath);
/// <summary>
/// For blobs, the id is the same as the file hash. Please note, when deserialising, the id will be set from the original hash generated on sending.
@@ -45,7 +49,7 @@ public class Blob : Base
public string? GetFileHash()
{
if ((_isHashExpired || _hash == null) && filePath != null)
if ((_isHashExpired || _hash == null))
{
_hash = HashUtility.HashFile(filePath);
}
@@ -1,12 +1,13 @@
using System.Text;
using Speckle.Sdk.Models;
namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures) : IHasByteSize
public record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures) : IHasByteSize
{
public int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public virtual int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public bool Equals(BaseItem? other)
public virtual bool Equals(BaseItem? other)
{
if (other is null)
{
@@ -17,3 +18,10 @@ public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id
public override int GetHashCode() => Id.GetHashCode();
}
public sealed record BlobItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures, Blob Blob)
: BaseItem(Id, Json, NeedsStorage, Closures)
{
public Blob Blob { get; } = Blob;
public override int ByteSize { get; } = (int)Blob.FileInfo.Length;
}
@@ -9,7 +9,13 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public interface IObjectSaver : IDisposable
{
Exception? Exception { get; set; }
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
Task Start(
int? maxParallelism,
int? httpBatchSize,
int? blobBatchSize,
int? cacheBatchSize,
CancellationToken cancellationToken
);
void DoneTraversing();
Task DoneSaving();
Task SaveAsync(BaseItem item);
@@ -19,14 +25,11 @@ public sealed class ObjectSaver(
IProgress<ProgressArgs>? progress,
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IServerBlobManager serverBlobManager,
ILogger<ObjectSaver> logger,
SerializeProcessOptions options,
CancellationToken cancellationToken
#pragma warning disable CS9107
#pragma warning disable CA2254
) : ChannelSaver<BaseItem>, IObjectSaver
#pragma warning restore CA2254
#pragma warning restore CS9107
) : ChannelSaver<BaseItem, BlobItem>, IObjectSaver
{
private readonly CancellationTokenSource _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken
@@ -40,6 +43,23 @@ public sealed class ObjectSaver(
private long _objectsSerialized;
private bool _disposed;
protected override async Task SendBlobToServerInternal(Batch<BlobItem> batch)
{
var objectBatch = batch.Items.Distinct().Select(x => x.Blob).ToList();
// var hasObjects = await serverBlobManager
// .HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), _cancellationTokenSource.Token)
// .ConfigureAwait(false);
// objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList();
if (objectBatch.Count != 0)
{
// Interlocked.Add(ref _uploading, batch.Items.Count);
// progress?.Report(new(ProgressEvent.UploadingObjects, _uploading, null));
await serverBlobManager
.UploadBlobs(objectBatch, true, progress, _cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
{
if (IsCancelled())
@@ -17,6 +17,7 @@ public record SerializeProcessOptions(
{
public int? MaxHttpSendBatchSize { get; set; }
public int? MaxCacheBatchSize { get; set; }
public int? MaxBlobBatchSize { get; set; }
public int? MaxParallelism { get; set; }
}
@@ -109,6 +110,7 @@ public sealed class SerializeProcess(
var channelTask = objectSaver.Start(
options.MaxParallelism,
options.MaxHttpSendBatchSize,
options.MaxBlobBatchSize,
options.MaxCacheBatchSize,
_processSource.Token
);
@@ -13,26 +13,29 @@ public class SerializeProcessFactory(
IObjectSerializerFactory objectSerializerFactory,
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory,
IServerObjectManagerFactory serverObjectManagerFactory,
IServerBlobManagerFactory serverBlobManagerFactory,
ILoggerFactory loggerFactory
) : ISerializeProcessFactory
{
public ISerializeProcess CreateSerializeProcess(
Uri url,
string streamId,
string projectId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
)
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);
return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken, options);
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(projectId);
var serverObjectManager = serverObjectManagerFactory.Create(url, projectId, authorizationToken);
var serverBlobManager = serverBlobManagerFactory.Create(url, projectId, authorizationToken);
return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, serverBlobManager, progress, cancellationToken, options);
}
public ISerializeProcess CreateSerializeProcess(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IServerBlobManager serverBlobManager,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
@@ -43,6 +46,7 @@ public class SerializeProcessFactory(
progress,
sqLiteJsonCacheManager,
serverObjectManager,
serverBlobManager,
loggerFactory.CreateLogger<ObjectSaver>(),
options ?? new SerializeProcessOptions(),
cancellationToken
@@ -68,6 +72,7 @@ public class SerializeProcessFactory(
return CreateSerializeProcess(
memoryJsonCacheManager,
new MemoryServerObjectManager(objects),
null!, //this would need a better solution
progress,
cancellationToken,
options
@@ -0,0 +1,22 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
namespace Speckle.Sdk.Serialisation.V2;
[GenerateAutoInterface]
public class ServerBlobManagerFactory(ISpeckleHttp speckleHttp, ISdkActivityFactory activityFactory)
: IServerBlobManagerFactory
{
public IServerBlobManager Create(
Uri serverUrl,
string projectId,
string? authorizationToken,
TimeSpan? timeout = null
)
{
var client = speckleHttp.CreateHttpClient(authorizationToken: authorizationToken);
client.BaseAddress = serverUrl;
return new ServerBlobManager(client);
}
}
@@ -0,0 +1,51 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Transports;
using Speckle.Sdk.Transports.ServerUtils;
namespace Speckle.Sdk.Serialisation.V2;
[GenerateAutoInterface(VisibilityModifier = "public")]
internal sealed class ServerBlobManager : IServerBlobManager
{
private readonly HttpClient _authorizedClient;
public ServerBlobManager(HttpClient authorizedClient)
{
_authorizedClient = authorizedClient;
}
public async Task UploadBlobs(
string projectId,
IReadOnlyCollection<(string blobId, string filePath)> objects,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
)
{
if (objects.Count == 0)
{
return;
}
var multipartFormDataContent = new MultipartFormDataContent();
foreach (var (id, filePath) in objects)
{
var fileName = Path.GetFileName(filePath);
var stream = File.OpenRead(filePath);
StreamContent fsc = new(stream);
var hash = id.Split(':')[1];
multipartFormDataContent.Add(fsc, $"hash:{hash}", fileName);
cancellationToken.ThrowIfCancellationRequested();
}
using var message = new HttpRequestMessage();
message.RequestUri = new Uri($"/api/stream/{projectId}/blob", UriKind.Relative);
message.Method = HttpMethod.Post;
message.Content = new ProgressContent(multipartFormDataContent, progress);
using var response = await _authorizedClient.SendAsync(message, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
}
}