diff --git a/src/Speckle.Sdk.Dependencies/BroadcastChannel.cs b/src/Speckle.Sdk.Dependencies/BroadcastChannel.cs new file mode 100644 index 00000000..c83f62b9 --- /dev/null +++ b/src/Speckle.Sdk.Dependencies/BroadcastChannel.cs @@ -0,0 +1,41 @@ +using System.Threading.Channels; + +namespace Speckle.Sdk.Dependencies; + +internal sealed class BroadcastChannel +{ + private readonly List> _subscribers = []; + + public ChannelReader Subscribe() + { + var channel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true }); + _subscribers.Add(channel); + return channel.Reader; + } + + public async Task WriteAsync(T item, CancellationToken cancellationToken) + { + foreach (var sub in _subscribers) + { + await sub.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); + } + } + + public bool IsReadingCompleted() + { + return _subscribers.All(x => x.Reader.Completion.IsCompleted); + } + + public void CompleteWriters() + { + foreach (var sub in _subscribers) + { + sub.Writer.Complete(); + } + } + + public async Task CompleteReaders() + { + await Task.WhenAll(_subscribers.Select(x => x.Reader.Completion)).ConfigureAwait(false); + } +} diff --git a/src/Speckle.Sdk.Dependencies/Serialization/Batch.cs b/src/Speckle.Sdk.Dependencies/Serialization/Batch.cs index b65a406b..0bdc21a7 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/Batch.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/Batch.cs @@ -6,28 +6,23 @@ namespace Speckle.Sdk.Serialisation.V2.Send; public sealed class Batch : IMemoryOwner where T : IHasByteSize { - private static readonly Pool> _pool = Pools.CreateListPool(); -#pragma warning disable IDE0032 - private readonly List _items = _pool.Get(); - private int _batchByteSize; -#pragma warning restore IDE0032 + private static readonly Pool> s_pool = Pools.CreateListPool(); + public List Items { get; } = s_pool.Get(); + public int BatchByteSize { get; private set; } public void Add(T item) { - _items.Add(item); - _batchByteSize += item.ByteSize; + Items.Add(item); + BatchByteSize += item.ByteSize; } public void TrimExcess() { - _items.TrimExcess(); - _batchByteSize = _items.Sum(x => x.ByteSize); + Items.TrimExcess(); + BatchByteSize = Items.Sum(x => x.ByteSize); } - public int BatchByteSize => _batchByteSize; - public List Items => _items; + public void Dispose() => s_pool.Return(Items); - public void Dispose() => _pool.Return(_items); - - public Memory Memory => new(_items.ToArray()); + public Memory Memory => new(Items.ToArray()); } diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index b688f3a2..79707078 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -1,74 +1,134 @@ using System.Buffers; -using System.Threading.Channels; using Open.ChannelExtensions; using Speckle.Sdk.Serialisation.V2.Send; namespace Speckle.Sdk.Dependencies.Serialization; -public abstract class ChannelSaver - where T : IHasByteSize +public abstract class ChannelSaver + where TItem : IHasByteSize + where TBlobItem : IHasByteSize, TItem { - private const int SEND_CAPACITY = 10000; private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes + private const int BLOB_SEND_CHUNK_SIZE = 10; //count private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2); private const int MAX_PARALLELISM_HTTP = 4; - private const int HTTP_CAPACITY = 500; - private const int MAX_CACHE_WRITE_PARALLELISM = 1; private const int MAX_CACHE_BATCH = 1000; - private readonly Channel _checkCacheChannel = Channel.CreateBounded( - new BoundedChannelOptions(SEND_CAPACITY) - { - AllowSynchronousContinuations = true, - Capacity = SEND_CAPACITY, - SingleWriter = false, - SingleReader = false, - FullMode = BoundedChannelFullMode.Wait, - }, - _ => throw new NotImplementedException("Dropping items not supported.") - ); + private readonly BroadcastChannel _broadcastChannel = new(); - public Task Start( + public async Task Start( int? maxParallelism, int? httpBatchSize, + int? blobSendCache, int? cacheBatchSize, CancellationToken cancellationToken - ) => - _checkCacheChannel - .Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE) - .WithTimeout(HTTP_BATCH_TIMEOUT) - .PipeAsync( - maxParallelism ?? MAX_PARALLELISM_HTTP, - async x => await SendToServer(x).ConfigureAwait(false), - HTTP_CAPACITY, - false, + ) + { + maxParallelism ??= MAX_PARALLELISM_HTTP; + httpBatchSize ??= HTTP_SEND_CHUNK_SIZE; + blobSendCache ??= BLOB_SEND_CHUNK_SIZE; + cacheBatchSize ??= MAX_CACHE_BATCH; + await StartInternal( + maxParallelism.Value, + httpBatchSize.Value, + blobSendCache.Value, + cacheBatchSize.Value, cancellationToken ) - .Join() - .Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true) - .WithTimeout(HTTP_BATCH_TIMEOUT) - .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken) - .ContinueWith( - t => - { - Exception? ex = t.Exception; - if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested) - { - ex = new OperationCanceledException(); - } + .ConfigureAwait(false); + } - if (ex is not null) - { - RecordException(ex); - } - _checkCacheChannel.Writer.TryComplete(ex); - }, - cancellationToken, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Current + private Task StartInternal( + int maxParallelism, + int httpBatchSize, + int blobSendCache, + int cacheBatchSize, + CancellationToken cancellationToken + ) + { + Task serverSend = _broadcastChannel + .Subscribe() + .BatchByByteSize(httpBatchSize) + .WithTimeout(HTTP_BATCH_TIMEOUT) + .ReadAllConcurrentlyAsync( + maxParallelism, + async x => await SendToServer(x).ConfigureAwait(false), + cancellationToken ); - public async Task SaveAsync(T item, CancellationToken cancellationToken) + Task writeCache = _broadcastChannel + .Subscribe() + .Batch(cacheBatchSize) + .ReadAll(SaveToCache, true, cancellationToken: cancellationToken) + .AsTask(); + + Task blobsCache = _broadcastChannel + .Subscribe() + .OfType() + .BatchByByteSize(blobSendCache) + .ReadAllAsync( + async x => await SendBlobToServer(x).ConfigureAwait(false), + true, + cancellationToken: cancellationToken + ) + .AsTask(); + + return Task.WhenAll(serverSend, writeCache, blobsCache); + + // return _broadcastChannel + // .Subscribe() + // .BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE) + // .WithTimeout(HTTP_BATCH_TIMEOUT) + // .PipeAsync( + // maxParallelism ?? MAX_PARALLELISM_HTTP, + // async x => await SendToServer(x).ConfigureAwait(false), + // HTTP_CAPACITY, + // false, + // cancellationToken + // ) + // .Join() + // .Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true) + // .WithTimeout(HTTP_BATCH_TIMEOUT) + // .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken) + // .ContinueWith( + // t => + // { + // Exception? ex = t.Exception; + // if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested) + // { + // ex = new OperationCanceledException(); + // } + // + // if (ex is not null) + // { + // RecordException(ex); + // } + // + // _checkCacheChannel.Writer.TryComplete(ex); + // }, + // cancellationToken, + // TaskContinuationOptions.ExecuteSynchronously, + // TaskScheduler.Current + // ); + } + + private async ValueTask SendBlobToServer(IMemoryOwner batch) + { + try + { + await SendBlobToServerInternal((Batch)batch).ConfigureAwait(false); + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + RecordException(ex); + } + } + + protected abstract Task SendBlobToServerInternal(Batch batch); + + public async Task SaveAsync(TItem item, CancellationToken cancellationToken) { if (Exception is not null) { @@ -76,36 +136,34 @@ public abstract class ChannelSaver } //can switch to check then try pattern when back pressure is needed or exceptions are too much //the trees don't need to respond to back pressure - await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); + await _broadcastChannel.WriteAsync(item, cancellationToken).ConfigureAwait(false); } - private async Task> SendToServer(IMemoryOwner batch) + private async Task SendToServer(IMemoryOwner batch) { try { - await SendToServerInternal((Batch)batch).ConfigureAwait(false); - return batch; + await SendToServerInternal((Batch)batch).ConfigureAwait(false); } #pragma warning disable CA1031 catch (Exception ex) #pragma warning restore CA1031 { RecordException(ex); - return batch; } } - protected abstract Task SendToServerInternal(Batch batch); + protected abstract Task SendToServerInternal(Batch batch); - public abstract void SaveToCache(List item); + public abstract void SaveToCache(List item); - public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete(); + public void DoneTraversing() => _broadcastChannel.CompleteWriters(); public async Task DoneSaving() { - if (!_checkCacheChannel.Reader.Completion.IsCompleted) + if (!_broadcastChannel.IsReadingCompleted()) { - await _checkCacheChannel.Reader.Completion.ConfigureAwait(false); + await _broadcastChannel.CompleteReaders().ConfigureAwait(false); } } @@ -114,6 +172,5 @@ public abstract class ChannelSaver private void RecordException(Exception ex) { Exception = ex; - _checkCacheChannel.Writer.TryComplete(ex); } } diff --git a/src/Speckle.Sdk/Models/Blob.cs b/src/Speckle.Sdk/Models/Blob.cs index 540d1ca0..638402bf 100644 --- a/src/Speckle.Sdk/Models/Blob.cs +++ b/src/Speckle.Sdk/Models/Blob.cs @@ -1,38 +1,42 @@ -using System.Runtime.Serialization; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.Serialization; using Speckle.Newtonsoft.Json; namespace Speckle.Sdk.Models; [SpeckleType("Speckle.Core.Models.Blob")] -public class Blob : Base +public sealed class Blob : Base { [JsonIgnore] public static int LocalHashPrefixLength => 20; private string _filePath; - private string _hash; + private string? _hash; private bool _isHashExpired = true; public Blob() { } + [SetsRequiredMembers] public Blob(string filePath) { this.filePath = filePath; + this.originalPath = filePath; } - public string filePath + public required string filePath { get => _filePath; set { - originalPath ??= value; - _filePath = value; _isHashExpired = true; } } - public string originalPath { get; set; } + public required string originalPath { get; set; } + + [JsonIgnore] + public FileInfo FileInfo => new(filePath); /// /// For blobs, the id is the same as the file hash. Please note, when deserialising, the id will be set from the original hash generated on sending. @@ -45,7 +49,7 @@ public class Blob : Base public string? GetFileHash() { - if ((_isHashExpired || _hash == null) && filePath != null) + if ((_isHashExpired || _hash == null)) { _hash = HashUtility.HashFile(filePath); } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/BaseItem.cs b/src/Speckle.Sdk/Serialisation/V2/Send/BaseItem.cs index 2a88e3db..172c01db 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/BaseItem.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/BaseItem.cs @@ -1,12 +1,13 @@ using System.Text; +using Speckle.Sdk.Models; namespace Speckle.Sdk.Serialisation.V2.Send; -public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary? Closures) : IHasByteSize +public record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary? Closures) : IHasByteSize { - public int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value); + public virtual int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value); - public bool Equals(BaseItem? other) + public virtual bool Equals(BaseItem? other) { if (other is null) { @@ -17,3 +18,10 @@ public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary Id.GetHashCode(); } + +public sealed record BlobItem(Id Id, Json Json, bool NeedsStorage, Dictionary? Closures, Blob Blob) + : BaseItem(Id, Json, NeedsStorage, Closures) +{ + public Blob Blob { get; } = Blob; + public override int ByteSize { get; } = (int)Blob.FileInfo.Length; +} diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs index 19ccca3d..57bd09b9 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs @@ -9,7 +9,13 @@ namespace Speckle.Sdk.Serialisation.V2.Send; public interface IObjectSaver : IDisposable { Exception? Exception { get; set; } - Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken); + Task Start( + int? maxParallelism, + int? httpBatchSize, + int? blobBatchSize, + int? cacheBatchSize, + CancellationToken cancellationToken + ); void DoneTraversing(); Task DoneSaving(); Task SaveAsync(BaseItem item); @@ -19,14 +25,11 @@ public sealed class ObjectSaver( IProgress? progress, ISqLiteJsonCacheManager sqLiteJsonCacheManager, IServerObjectManager serverObjectManager, + IServerBlobManager serverBlobManager, ILogger logger, SerializeProcessOptions options, CancellationToken cancellationToken -#pragma warning disable CS9107 -#pragma warning disable CA2254 -) : ChannelSaver, IObjectSaver -#pragma warning restore CA2254 -#pragma warning restore CS9107 +) : ChannelSaver, IObjectSaver { private readonly CancellationTokenSource _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken @@ -40,6 +43,23 @@ public sealed class ObjectSaver( private long _objectsSerialized; private bool _disposed; + protected override async Task SendBlobToServerInternal(Batch batch) + { + var objectBatch = batch.Items.Distinct().Select(x => x.Blob).ToList(); + // var hasObjects = await serverBlobManager + // .HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), _cancellationTokenSource.Token) + // .ConfigureAwait(false); + // objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList(); + if (objectBatch.Count != 0) + { + // Interlocked.Add(ref _uploading, batch.Items.Count); + // progress?.Report(new(ProgressEvent.UploadingObjects, _uploading, null)); + await serverBlobManager + .UploadBlobs(objectBatch, true, progress, _cancellationTokenSource.Token) + .ConfigureAwait(false); + } + } + protected override async Task SendToServerInternal(Batch batch) { if (IsCancelled()) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs index 6dac6f6f..759d9990 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs @@ -17,6 +17,7 @@ public record SerializeProcessOptions( { public int? MaxHttpSendBatchSize { get; set; } public int? MaxCacheBatchSize { get; set; } + public int? MaxBlobBatchSize { get; set; } public int? MaxParallelism { get; set; } } @@ -109,6 +110,7 @@ public sealed class SerializeProcess( var channelTask = objectSaver.Start( options.MaxParallelism, options.MaxHttpSendBatchSize, + options.MaxBlobBatchSize, options.MaxCacheBatchSize, _processSource.Token ); diff --git a/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs b/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs index b708f4d4..874ee0c5 100644 --- a/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs +++ b/src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs @@ -13,26 +13,29 @@ public class SerializeProcessFactory( IObjectSerializerFactory objectSerializerFactory, ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory, IServerObjectManagerFactory serverObjectManagerFactory, + IServerBlobManagerFactory serverBlobManagerFactory, ILoggerFactory loggerFactory ) : ISerializeProcessFactory { public ISerializeProcess CreateSerializeProcess( Uri url, - string streamId, + string projectId, string? authorizationToken, IProgress? progress, CancellationToken cancellationToken, SerializeProcessOptions? options = null ) { - var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId); - var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken); - return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken, options); + var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(projectId); + var serverObjectManager = serverObjectManagerFactory.Create(url, projectId, authorizationToken); + var serverBlobManager = serverBlobManagerFactory.Create(url, projectId, authorizationToken); + return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, serverBlobManager, progress, cancellationToken, options); } public ISerializeProcess CreateSerializeProcess( ISqLiteJsonCacheManager sqLiteJsonCacheManager, IServerObjectManager serverObjectManager, + IServerBlobManager serverBlobManager, IProgress? progress, CancellationToken cancellationToken, SerializeProcessOptions? options = null @@ -43,6 +46,7 @@ public class SerializeProcessFactory( progress, sqLiteJsonCacheManager, serverObjectManager, + serverBlobManager, loggerFactory.CreateLogger(), options ?? new SerializeProcessOptions(), cancellationToken @@ -68,6 +72,7 @@ public class SerializeProcessFactory( return CreateSerializeProcess( memoryJsonCacheManager, new MemoryServerObjectManager(objects), + null!, //this would need a better solution progress, cancellationToken, options diff --git a/src/Speckle.Sdk/Serialisation/V2/ServerBlobManagerFactory.cs b/src/Speckle.Sdk/Serialisation/V2/ServerBlobManagerFactory.cs new file mode 100644 index 00000000..0259af62 --- /dev/null +++ b/src/Speckle.Sdk/Serialisation/V2/ServerBlobManagerFactory.cs @@ -0,0 +1,22 @@ +using Speckle.InterfaceGenerator; +using Speckle.Sdk.Helpers; +using Speckle.Sdk.Logging; + +namespace Speckle.Sdk.Serialisation.V2; + +[GenerateAutoInterface] +public class ServerBlobManagerFactory(ISpeckleHttp speckleHttp, ISdkActivityFactory activityFactory) + : IServerBlobManagerFactory +{ + public IServerBlobManager Create( + Uri serverUrl, + string projectId, + string? authorizationToken, + TimeSpan? timeout = null + ) + { + var client = speckleHttp.CreateHttpClient(authorizationToken: authorizationToken); + client.BaseAddress = serverUrl; + return new ServerBlobManager(client); + } +} diff --git a/src/Speckle.Sdk/Serialisation/V2/ServerObjectBlobManager.cs b/src/Speckle.Sdk/Serialisation/V2/ServerObjectBlobManager.cs new file mode 100644 index 00000000..6b9fb7c7 --- /dev/null +++ b/src/Speckle.Sdk/Serialisation/V2/ServerObjectBlobManager.cs @@ -0,0 +1,51 @@ +using Speckle.InterfaceGenerator; +using Speckle.Sdk.Helpers; +using Speckle.Sdk.Transports; +using Speckle.Sdk.Transports.ServerUtils; + +namespace Speckle.Sdk.Serialisation.V2; + +[GenerateAutoInterface(VisibilityModifier = "public")] +internal sealed class ServerBlobManager : IServerBlobManager +{ + private readonly HttpClient _authorizedClient; + + public ServerBlobManager(HttpClient authorizedClient) + { + _authorizedClient = authorizedClient; + } + + public async Task UploadBlobs( + string projectId, + IReadOnlyCollection<(string blobId, string filePath)> objects, + IProgress? progress, + CancellationToken cancellationToken + ) + { + if (objects.Count == 0) + { + return; + } + + var multipartFormDataContent = new MultipartFormDataContent(); + foreach (var (id, filePath) in objects) + { + var fileName = Path.GetFileName(filePath); + var stream = File.OpenRead(filePath); + StreamContent fsc = new(stream); + var hash = id.Split(':')[1]; + + multipartFormDataContent.Add(fsc, $"hash:{hash}", fileName); + cancellationToken.ThrowIfCancellationRequested(); + } + + using var message = new HttpRequestMessage(); + message.RequestUri = new Uri($"/api/stream/{projectId}/blob", UriKind.Relative); + message.Method = HttpMethod.Post; + message.Content = new ProgressContent(multipartFormDataContent, progress); + + using var response = await _authorizedClient.SendAsync(message, cancellationToken).ConfigureAwait(false); + + response.EnsureSuccessStatusCode(); + } +}