From 536e58aacc4d2bb21ee89b1d4f451e84ff5598fd Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 25 Apr 2025 17:45:01 +0100 Subject: [PATCH 1/6] Don't drop items to write when sending fast --- src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs | 7 +++---- src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs | 6 +++--- src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index 45a0b93d..1f3efe84 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization; public abstract class ChannelSaver where T : IHasByteSize { - private const int SEND_CAPACITY = 500; + private const int SEND_CAPACITY = 5000; private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2); private const int MAX_PARALLELISM_HTTP = 4; @@ -68,14 +68,13 @@ public abstract class ChannelSaver TaskScheduler.Current ); - public void Save(T item) + public async Task SaveAsync(T item, CancellationToken cancellationToken) { if (Exception is not null) { return; //don't save if we're already done through an error } - // ReSharper disable once MethodSupportsCancellation - _checkCacheChannel.Writer.TryWrite(item); + await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); } private async Task> SendToServer(IMemoryOwner batch) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs index f797c3c2..d54f2d32 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs @@ -12,7 +12,7 @@ public interface IObjectSaver : IDisposable Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken); void DoneTraversing(); Task DoneSaving(); - void SaveItem(BaseItem item); + Task SaveAsync(BaseItem item); long Uploaded { get; } long Cached { get; } } @@ -81,10 +81,10 @@ public sealed class ObjectSaver( } } - public void SaveItem(BaseItem item) + public async Task SaveAsync(BaseItem item) { Interlocked.Increment(ref _objectsSerialized); - Save(item); + await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false); } public override void SaveToCache(List batch) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs index 92bc6478..28fae019 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs @@ -258,7 +258,7 @@ public sealed class SerializeProcess( } Interlocked.Increment(ref _objectsSerialized); - objectSaver.SaveItem(item); + await objectSaver.SaveAsync(item).ConfigureAwait(false); } if (!currentClosures.ContainsKey(item.Id)) From 33c14fc14c3c76466298fceb715af850a8e1483d Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 25 Apr 2025 18:09:04 +0100 Subject: [PATCH 2/6] Remove extras --- src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs index d54f2d32..028ca478 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs @@ -13,8 +13,6 @@ public interface IObjectSaver : IDisposable void DoneTraversing(); Task DoneSaving(); Task SaveAsync(BaseItem item); - long Uploaded { get; } - long Cached { get; } } public sealed class ObjectSaver( @@ -40,8 +38,6 @@ public sealed class ObjectSaver( private long _cached; private long _objectsSerialized; - public long Cached => _cached; - public long Uploaded => _uploaded; protected override async Task SendToServerInternal(Batch batch) { From d69f0bba2ae40d68fd831840beca4f7d8973e32f Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 25 Apr 2025 18:13:05 +0100 Subject: [PATCH 3/6] fmt --- src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs index 028ca478..f325e1f4 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSaver.cs @@ -80,7 +80,7 @@ public sealed class ObjectSaver( public async Task SaveAsync(BaseItem item) { Interlocked.Increment(ref _objectsSerialized); - await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false); + await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false); } public override void SaveToCache(List batch) From ff390f772d64555f7269719ee9b5987ea8f61b51 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 25 Apr 2025 18:24:34 +0100 Subject: [PATCH 4/6] just wait for space instead of another task and reduce size to 1000 --- .../Serialization/ChannelSaver.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index 1f3efe84..f340c53c 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization; public abstract class ChannelSaver where T : IHasByteSize { - private const int SEND_CAPACITY = 5000; + private const int SEND_CAPACITY = 1000; private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2); private const int MAX_PARALLELISM_HTTP = 4; @@ -74,7 +74,12 @@ public abstract class ChannelSaver { return; //don't save if we're already done through an error } - await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); + //better wait to handle writes instead of WriteAsync to create unlimited tasks + while (await _checkCacheChannel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false)) + { + if (_checkCacheChannel.Writer.TryWrite(item)) + return; + } } private async Task> SendToServer(IMemoryOwner batch) From b5b0922e7f2195fa3bc0e7bce5a717c342d11c06 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Mon, 28 Apr 2025 09:35:02 +0100 Subject: [PATCH 5/6] Revert to write async --- .../Serialization/ChannelSaver.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index f340c53c..fd7019ba 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -74,12 +74,9 @@ public abstract class ChannelSaver { return; //don't save if we're already done through an error } - //better wait to handle writes instead of WriteAsync to create unlimited tasks - while (await _checkCacheChannel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false)) - { - if (_checkCacheChannel.Writer.TryWrite(item)) - return; - } + //can switch to check then try pattern when back pressure is needed or exceptions are too much + //the trees don't need to respond to back pressure + await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken); } private async Task> SendToServer(IMemoryOwner batch) From edf63d4a1bda44f5ace8ceaa107bec312de129ea Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Mon, 28 Apr 2025 09:39:46 +0100 Subject: [PATCH 6/6] fix build issue --- src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index fd7019ba..89bd97f7 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -76,7 +76,7 @@ public abstract class ChannelSaver } //can switch to check then try pattern when back pressure is needed or exceptions are too much //the trees don't need to respond to back pressure - await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken); + await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); } private async Task> SendToServer(IMemoryOwner batch)