Don't drop items to write when sending fast
This commit is contained in:
@@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization;
|
||||
public abstract class ChannelSaver<T>
|
||||
where T : IHasByteSize
|
||||
{
|
||||
private const int SEND_CAPACITY = 500;
|
||||
private const int SEND_CAPACITY = 5000;
|
||||
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
|
||||
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
|
||||
private const int MAX_PARALLELISM_HTTP = 4;
|
||||
@@ -68,14 +68,13 @@ public abstract class ChannelSaver<T>
|
||||
TaskScheduler.Current
|
||||
);
|
||||
|
||||
public void Save(T item)
|
||||
public async Task SaveAsync(T item, CancellationToken cancellationToken)
|
||||
{
|
||||
if (Exception is not null)
|
||||
{
|
||||
return; //don't save if we're already done through an error
|
||||
}
|
||||
// ReSharper disable once MethodSupportsCancellation
|
||||
_checkCacheChannel.Writer.TryWrite(item);
|
||||
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
|
||||
|
||||
@@ -12,7 +12,7 @@ public interface IObjectSaver : IDisposable
|
||||
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
|
||||
void DoneTraversing();
|
||||
Task DoneSaving();
|
||||
void SaveItem(BaseItem item);
|
||||
Task SaveAsync(BaseItem item);
|
||||
long Uploaded { get; }
|
||||
long Cached { get; }
|
||||
}
|
||||
@@ -81,10 +81,10 @@ public sealed class ObjectSaver(
|
||||
}
|
||||
}
|
||||
|
||||
public void SaveItem(BaseItem item)
|
||||
public async Task SaveAsync(BaseItem item)
|
||||
{
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
Save(item);
|
||||
await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public override void SaveToCache(List<BaseItem> batch)
|
||||
|
||||
@@ -258,7 +258,7 @@ public sealed class SerializeProcess(
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
objectSaver.SaveItem(item);
|
||||
await objectSaver.SaveAsync(item).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (!currentClosures.ContainsKey(item.Id))
|
||||
|
||||
Reference in New Issue
Block a user