Compare commits

...

7 Commits

Author SHA1 Message Date
Adam Hathcock 24db4c4ae4 Merge pull request #288 from specklesystems/adam/no-drop-writes
.NET Build and Publish / build (push) Has been cancelled
fix (main) Don't drop items to write when sending fast
2025-04-28 10:18:48 +01:00
Adam Hathcock edf63d4a1b fix build issue 2025-04-28 09:39:46 +01:00
Adam Hathcock b5b0922e7f Revert to write async 2025-04-28 09:35:02 +01:00
Adam Hathcock ff390f772d just wait for space instead of another task and reduce size to 1000 2025-04-25 18:24:34 +01:00
Adam Hathcock d69f0bba2a fmt 2025-04-25 18:13:05 +01:00
Adam Hathcock 33c14fc14c Remove extras 2025-04-25 18:09:04 +01:00
Adam Hathcock 536e58aacc Don't drop items to write when sending fast 2025-04-25 17:45:01 +01:00
3 changed files with 9 additions and 12 deletions
@@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelSaver<T> public abstract class ChannelSaver<T>
where T : IHasByteSize where T : IHasByteSize
{ {
private const int SEND_CAPACITY = 500; private const int SEND_CAPACITY = 1000;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2); private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4; private const int MAX_PARALLELISM_HTTP = 4;
@@ -68,14 +68,15 @@ public abstract class ChannelSaver<T>
TaskScheduler.Current TaskScheduler.Current
); );
public void Save(T item) public async Task SaveAsync(T item, CancellationToken cancellationToken)
{ {
if (Exception is not null) if (Exception is not null)
{ {
return; //don't save if we're already done through an error return; //don't save if we're already done through an error
} }
// ReSharper disable once MethodSupportsCancellation //can switch to check then try pattern when back pressure is needed or exceptions are too much
_checkCacheChannel.Writer.TryWrite(item); //the trees don't need to respond to back pressure
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
} }
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch) private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
@@ -12,9 +12,7 @@ public interface IObjectSaver : IDisposable
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken); Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
void DoneTraversing(); void DoneTraversing();
Task DoneSaving(); Task DoneSaving();
void SaveItem(BaseItem item); Task SaveAsync(BaseItem item);
long Uploaded { get; }
long Cached { get; }
} }
public sealed class ObjectSaver( public sealed class ObjectSaver(
@@ -40,8 +38,6 @@ public sealed class ObjectSaver(
private long _cached; private long _cached;
private long _objectsSerialized; private long _objectsSerialized;
public long Cached => _cached;
public long Uploaded => _uploaded;
protected override async Task SendToServerInternal(Batch<BaseItem> batch) protected override async Task SendToServerInternal(Batch<BaseItem> batch)
{ {
@@ -81,10 +77,10 @@ public sealed class ObjectSaver(
} }
} }
public void SaveItem(BaseItem item) public async Task SaveAsync(BaseItem item)
{ {
Interlocked.Increment(ref _objectsSerialized); Interlocked.Increment(ref _objectsSerialized);
Save(item); await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false);
} }
public override void SaveToCache(List<BaseItem> batch) public override void SaveToCache(List<BaseItem> batch)
@@ -258,7 +258,7 @@ public sealed class SerializeProcess(
} }
Interlocked.Increment(ref _objectsSerialized); Interlocked.Increment(ref _objectsSerialized);
objectSaver.SaveItem(item); await objectSaver.SaveAsync(item).ConfigureAwait(false);
} }
if (!currentClosures.ContainsKey(item.Id)) if (!currentClosures.ContainsKey(item.Id))