Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 24db4c4ae4 | |||
| edf63d4a1b | |||
| b5b0922e7f | |||
| ff390f772d | |||
| d69f0bba2a | |||
| 33c14fc14c | |||
| 536e58aacc |
@@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization;
|
|||||||
public abstract class ChannelSaver<T>
|
public abstract class ChannelSaver<T>
|
||||||
where T : IHasByteSize
|
where T : IHasByteSize
|
||||||
{
|
{
|
||||||
private const int SEND_CAPACITY = 500;
|
private const int SEND_CAPACITY = 1000;
|
||||||
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
|
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
|
||||||
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
|
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
|
||||||
private const int MAX_PARALLELISM_HTTP = 4;
|
private const int MAX_PARALLELISM_HTTP = 4;
|
||||||
@@ -68,14 +68,15 @@ public abstract class ChannelSaver<T>
|
|||||||
TaskScheduler.Current
|
TaskScheduler.Current
|
||||||
);
|
);
|
||||||
|
|
||||||
public void Save(T item)
|
public async Task SaveAsync(T item, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (Exception is not null)
|
if (Exception is not null)
|
||||||
{
|
{
|
||||||
return; //don't save if we're already done through an error
|
return; //don't save if we're already done through an error
|
||||||
}
|
}
|
||||||
// ReSharper disable once MethodSupportsCancellation
|
//can switch to check then try pattern when back pressure is needed or exceptions are too much
|
||||||
_checkCacheChannel.Writer.TryWrite(item);
|
//the trees don't need to respond to back pressure
|
||||||
|
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
|
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
|
||||||
|
|||||||
@@ -12,9 +12,7 @@ public interface IObjectSaver : IDisposable
|
|||||||
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
|
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
|
||||||
void DoneTraversing();
|
void DoneTraversing();
|
||||||
Task DoneSaving();
|
Task DoneSaving();
|
||||||
void SaveItem(BaseItem item);
|
Task SaveAsync(BaseItem item);
|
||||||
long Uploaded { get; }
|
|
||||||
long Cached { get; }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class ObjectSaver(
|
public sealed class ObjectSaver(
|
||||||
@@ -40,8 +38,6 @@ public sealed class ObjectSaver(
|
|||||||
private long _cached;
|
private long _cached;
|
||||||
|
|
||||||
private long _objectsSerialized;
|
private long _objectsSerialized;
|
||||||
public long Cached => _cached;
|
|
||||||
public long Uploaded => _uploaded;
|
|
||||||
|
|
||||||
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
|
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
|
||||||
{
|
{
|
||||||
@@ -81,10 +77,10 @@ public sealed class ObjectSaver(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void SaveItem(BaseItem item)
|
public async Task SaveAsync(BaseItem item)
|
||||||
{
|
{
|
||||||
Interlocked.Increment(ref _objectsSerialized);
|
Interlocked.Increment(ref _objectsSerialized);
|
||||||
Save(item);
|
await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void SaveToCache(List<BaseItem> batch)
|
public override void SaveToCache(List<BaseItem> batch)
|
||||||
|
|||||||
@@ -258,7 +258,7 @@ public sealed class SerializeProcess(
|
|||||||
}
|
}
|
||||||
|
|
||||||
Interlocked.Increment(ref _objectsSerialized);
|
Interlocked.Increment(ref _objectsSerialized);
|
||||||
objectSaver.SaveItem(item);
|
await objectSaver.SaveAsync(item).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!currentClosures.ContainsKey(item.Id))
|
if (!currentClosures.ContainsKey(item.Id))
|
||||||
|
|||||||
Reference in New Issue
Block a user