Merge branch 'dev' into main-dev
This commit is contained in:
+1
-1
@@ -170,7 +170,7 @@ Target(
|
||||
|
||||
Target(
|
||||
PACK,
|
||||
DependsOn(BUILD),
|
||||
DependsOn(TEST),
|
||||
async () =>
|
||||
{
|
||||
{
|
||||
|
||||
@@ -19,13 +19,13 @@ public static class BatchExtensions
|
||||
public static void AddBatchItem<T>(this IMemoryOwner<T> batch, T item)
|
||||
where T : IHasByteSize => ((Batch<T>)batch).Add(item);
|
||||
|
||||
public static int GetBatchSize<T>(this IMemoryOwner<T> batch, Action<string> logAsWarning, int maxBatchSize)
|
||||
public static int GetBatchSize<T>(this IMemoryOwner<T> batch, int maxBatchSize)
|
||||
where T : IHasByteSize
|
||||
{
|
||||
var currentSize = ((Batch<T>)batch).BatchByteSize;
|
||||
if (currentSize > maxBatchSize)
|
||||
{
|
||||
logAsWarning($"Batch size exceeded. Current size: {currentSize} bytes. Max size: {maxBatchSize} bytes.");
|
||||
//doing this to say it's full since the channel reader only does full being equivalent
|
||||
return maxBatchSize;
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ public static class ChannelExtensions
|
||||
{
|
||||
public static BatchingChannelReader<T, IMemoryOwner<T>> BatchByByteSize<T>(
|
||||
this ChannelReader<T> source,
|
||||
Action<string> logAsWarning,
|
||||
int batchSize,
|
||||
bool singleReader = false,
|
||||
bool allowSynchronousContinuations = false
|
||||
@@ -16,7 +15,6 @@ public static class ChannelExtensions
|
||||
where T : IHasByteSize =>
|
||||
new SizeBatchingChannelReader<T>(
|
||||
source ?? throw new ArgumentNullException(nameof(source)),
|
||||
logAsWarning,
|
||||
batchSize,
|
||||
singleReader,
|
||||
allowSynchronousContinuations
|
||||
|
||||
@@ -5,7 +5,7 @@ using Speckle.Sdk.Serialisation.V2.Send;
|
||||
|
||||
namespace Speckle.Sdk.Dependencies.Serialization;
|
||||
|
||||
public abstract class ChannelSaver<T>(Action<string> logAsWarning, CancellationToken cancellationToken)
|
||||
public abstract class ChannelSaver<T>
|
||||
where T : IHasByteSize
|
||||
{
|
||||
private const int SEND_CAPACITY = 500;
|
||||
@@ -16,8 +16,6 @@ public abstract class ChannelSaver<T>(Action<string> logAsWarning, CancellationT
|
||||
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
|
||||
private const int MAX_CACHE_BATCH = 500;
|
||||
|
||||
private readonly CancellationTokenSource _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
|
||||
new BoundedChannelOptions(SEND_CAPACITY)
|
||||
{
|
||||
@@ -30,26 +28,26 @@ public abstract class ChannelSaver<T>(Action<string> logAsWarning, CancellationT
|
||||
_ => throw new NotImplementedException("Dropping items not supported.")
|
||||
);
|
||||
|
||||
public Task Start() =>
|
||||
public Task Start(CancellationToken cancellationToken) =>
|
||||
_checkCacheChannel
|
||||
.Reader.BatchByByteSize(logAsWarning, HTTP_SEND_CHUNK_SIZE)
|
||||
.Reader.BatchByByteSize(HTTP_SEND_CHUNK_SIZE)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.PipeAsync(
|
||||
MAX_PARALLELISM_HTTP,
|
||||
async x => await SendToServer(x).ConfigureAwait(false),
|
||||
HTTP_CAPACITY,
|
||||
false,
|
||||
_cts.Token
|
||||
cancellationToken
|
||||
)
|
||||
.Join()
|
||||
.Batch(MAX_CACHE_BATCH)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, _cts.Token)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ContinueWith(
|
||||
t =>
|
||||
{
|
||||
Exception? ex = t.Exception;
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !_cts.Token.IsCancellationRequested)
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
ex = new OperationCanceledException();
|
||||
}
|
||||
@@ -60,18 +58,19 @@ public abstract class ChannelSaver<T>(Action<string> logAsWarning, CancellationT
|
||||
}
|
||||
_checkCacheChannel.Writer.TryComplete(ex);
|
||||
},
|
||||
_cts.Token,
|
||||
cancellationToken,
|
||||
TaskContinuationOptions.ExecuteSynchronously,
|
||||
TaskScheduler.Current
|
||||
);
|
||||
|
||||
public async ValueTask Save(T item)
|
||||
public void Save(T item, CancellationToken cancellationToken)
|
||||
{
|
||||
if (Exception is not null || _cts.IsCancellationRequested)
|
||||
if (Exception is not null || cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return; //don't save if we're already done through an error
|
||||
}
|
||||
await _checkCacheChannel.Writer.WriteAsync(item).ConfigureAwait(false);
|
||||
// ReSharper disable once MethodSupportsCancellation
|
||||
_checkCacheChannel.Writer.TryWrite(item);
|
||||
}
|
||||
|
||||
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
|
||||
@@ -124,7 +123,5 @@ public abstract class ChannelSaver<T>(Action<string> logAsWarning, CancellationT
|
||||
{
|
||||
Exception = ex;
|
||||
_checkCacheChannel.Writer.TryComplete(ex);
|
||||
//cancel everything!
|
||||
_cts.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ public interface IHasByteSize
|
||||
|
||||
public sealed class SizeBatchingChannelReader<T>(
|
||||
ChannelReader<T> source,
|
||||
Action<string> logAsWarning,
|
||||
int batchSize,
|
||||
bool singleReader,
|
||||
bool syncCont = false
|
||||
@@ -34,5 +33,5 @@ public sealed class SizeBatchingChannelReader<T>(
|
||||
|
||||
protected override void AddBatchItem(IMemoryOwner<T> batch, T item) => batch.AddBatchItem(item);
|
||||
|
||||
protected override int GetBatchSize(IMemoryOwner<T> batch) => batch.GetBatchSize(logAsWarning, _batchSize);
|
||||
protected override int GetBatchSize(IMemoryOwner<T> batch) => batch.GetBatchSize(_batchSize);
|
||||
}
|
||||
|
||||
@@ -6,8 +6,7 @@ namespace Speckle.Sdk.Serialisation.V2;
|
||||
public sealed class PriorityScheduler(
|
||||
ILogger<PriorityScheduler> logger,
|
||||
ThreadPriority priority,
|
||||
int maximumConcurrencyLevel,
|
||||
CancellationToken cancellationToken
|
||||
int maximumConcurrencyLevel
|
||||
) : TaskScheduler, IAsyncDisposable
|
||||
{
|
||||
private readonly BlockingCollection<Task> _tasks = new();
|
||||
@@ -56,7 +55,7 @@ public sealed class PriorityScheduler(
|
||||
while (true)
|
||||
{
|
||||
//we're done so leave
|
||||
if (_tasks.IsCompleted || cancellationToken.IsCancellationRequested)
|
||||
if (_tasks.IsCompleted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
@@ -66,11 +65,6 @@ public sealed class PriorityScheduler(
|
||||
{
|
||||
break;
|
||||
}
|
||||
//cancelled just leave
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
//didn't get a task but just timed out so continue
|
||||
if (!success)
|
||||
{
|
||||
|
||||
@@ -54,8 +54,7 @@ public sealed class DeserializeProcess(
|
||||
private readonly PriorityScheduler _belowNormal = new(
|
||||
loggerFactory.CreateLogger<PriorityScheduler>(),
|
||||
ThreadPriority.BelowNormal,
|
||||
Environment.ProcessorCount * 2,
|
||||
cancellationToken
|
||||
Environment.ProcessorCount * 2
|
||||
);
|
||||
|
||||
private readonly DeserializeProcessOptions _options = options ?? new();
|
||||
|
||||
@@ -35,21 +35,20 @@ public sealed class SerializeProcess(
|
||||
ILoggerFactory loggerFactory,
|
||||
CancellationToken cancellationToken,
|
||||
SerializeProcessOptions? options = null
|
||||
#pragma warning disable CS9107
|
||||
#pragma warning disable CA2254
|
||||
)
|
||||
: ChannelSaver<BaseItem>(x => loggerFactory.CreateLogger<SerializeProcess>().LogWarning(x), cancellationToken),
|
||||
ISerializeProcess
|
||||
#pragma warning restore CA2254
|
||||
#pragma warning restore CS9107
|
||||
) : ChannelSaver<BaseItem>, ISerializeProcess
|
||||
{
|
||||
private static readonly Dictionary<Id, NodeInfo> EMPTY_CLOSURES = new();
|
||||
|
||||
private readonly CancellationTokenSource _processSource = CancellationTokenSource.CreateLinkedTokenSource(
|
||||
cancellationToken
|
||||
);
|
||||
|
||||
//async dispose
|
||||
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed")]
|
||||
private readonly PriorityScheduler _highest = new(
|
||||
loggerFactory.CreateLogger<PriorityScheduler>(),
|
||||
ThreadPriority.Highest,
|
||||
2,
|
||||
cancellationToken
|
||||
2
|
||||
);
|
||||
|
||||
//async dispose
|
||||
@@ -57,10 +56,8 @@ public sealed class SerializeProcess(
|
||||
private readonly PriorityScheduler _belowNormal = new(
|
||||
loggerFactory.CreateLogger<PriorityScheduler>(),
|
||||
ThreadPriority.BelowNormal,
|
||||
Environment.ProcessorCount * 2,
|
||||
cancellationToken
|
||||
Environment.ProcessorCount * 2
|
||||
);
|
||||
|
||||
private readonly SerializeProcessOptions _options = options ?? new();
|
||||
private readonly ILogger<SerializeProcess> _logger = loggerFactory.CreateLogger<SerializeProcess>();
|
||||
|
||||
@@ -85,16 +82,17 @@ public sealed class SerializeProcess(
|
||||
await _highest.DisposeAsync().ConfigureAwait(false);
|
||||
await _belowNormal.DisposeAsync().ConfigureAwait(false);
|
||||
sqLiteJsonCacheManager.Dispose();
|
||||
_processSource.Dispose();
|
||||
}
|
||||
|
||||
public void ThrowIfFailed()
|
||||
private void ThrowIfFailed()
|
||||
{
|
||||
//always check for cancellation first
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
//order here matters...null with cancellation means a user did it, otherwise it's a real Exception
|
||||
if (Exception is not null)
|
||||
{
|
||||
throw new SpeckleException("Error while sending", Exception);
|
||||
}
|
||||
_processSource.Token.ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
private async Task WaitForSchedulerCompletion()
|
||||
@@ -107,20 +105,21 @@ public sealed class SerializeProcess(
|
||||
{
|
||||
try
|
||||
{
|
||||
var channelTask = Start();
|
||||
var channelTask = Start(_processSource.Token);
|
||||
var findTotalObjectsTask = Task.CompletedTask;
|
||||
if (!_options.SkipFindTotalObjects)
|
||||
{
|
||||
ThrowIfFailed();
|
||||
findTotalObjectsTask = Task.Factory.StartNew(
|
||||
() => TraverseTotal(root),
|
||||
cancellationToken,
|
||||
_processSource.Token,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_highest
|
||||
);
|
||||
}
|
||||
|
||||
await Traverse(root).ConfigureAwait(false);
|
||||
await Traverse(root, _processSource.Token).ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
DoneTraversing();
|
||||
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
@@ -139,6 +138,10 @@ public sealed class SerializeProcess(
|
||||
|
||||
private void TraverseTotal(Base obj)
|
||||
{
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
foreach (var child in baseChildFinder.GetChildren(obj))
|
||||
{
|
||||
_objectsFound++;
|
||||
@@ -147,76 +150,159 @@ public sealed class SerializeProcess(
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
|
||||
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, CancellationToken token)
|
||||
{
|
||||
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
|
||||
foreach (var child in baseChildFinder.GetChildren(obj))
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
// tmp is necessary because of the way closures close over loop variables
|
||||
var tmp = child;
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var t = Task
|
||||
.Factory.StartNew(
|
||||
async () => await Traverse(tmp).ConfigureAwait(false),
|
||||
cancellationToken,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_belowNormal
|
||||
)
|
||||
.Unwrap();
|
||||
tasks.Add(t);
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
Dictionary<Id, NodeInfo>[] taskClosures = [];
|
||||
if (tasks.Count > 0)
|
||||
try
|
||||
{
|
||||
taskClosures = await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
var childClosures = _childClosurePool.Get();
|
||||
foreach (var childClosure in taskClosures)
|
||||
{
|
||||
foreach (var kvp in childClosure)
|
||||
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
|
||||
using var childCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
foreach (var child in baseChildFinder.GetChildren(obj))
|
||||
{
|
||||
childClosures[kvp.Key] = kvp.Value;
|
||||
}
|
||||
_currentClosurePool.Return(childClosure);
|
||||
}
|
||||
|
||||
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, cancellationToken);
|
||||
|
||||
var currentClosures = _currentClosurePool.Get();
|
||||
Interlocked.Increment(ref _objectCount);
|
||||
progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _objectCount, Math.Max(_objectCount, _objectsFound)));
|
||||
foreach (var item in items)
|
||||
{
|
||||
if (item.NeedsStorage)
|
||||
{
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
await Save(item).ConfigureAwait(false);
|
||||
// tmp is necessary because of the way closures close over loop variables
|
||||
var tmp = child;
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
var t = Task
|
||||
.Factory.StartNew(
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
// don't need to capture here
|
||||
async () => await Traverse(tmp, childCancellationTokenSource.Token).ConfigureAwait(false),
|
||||
childCancellationTokenSource.Token,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_belowNormal
|
||||
)
|
||||
.Unwrap();
|
||||
tasks.Add(t);
|
||||
}
|
||||
|
||||
if (!currentClosures.ContainsKey(item.Id))
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
currentClosures.Add(item.Id, new NodeInfo(item.Json, item.Closures));
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
List<Dictionary<Id, NodeInfo>> taskClosures = new();
|
||||
if (tasks.Count > 0)
|
||||
{
|
||||
var currentTasks = tasks.ToList();
|
||||
do
|
||||
{
|
||||
//grab when any Task is done and see if we're cancelling
|
||||
var t = await Task.WhenAny(currentTasks).ConfigureAwait(false);
|
||||
if (t.IsCanceled)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
if (t.IsFaulted)
|
||||
{
|
||||
if (t.Exception is not null)
|
||||
{
|
||||
RecordException(t.Exception);
|
||||
}
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
taskClosures.Add(t.Result);
|
||||
currentTasks.Remove(t);
|
||||
} while (currentTasks.Count > 0);
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
var childClosures = _childClosurePool.Get();
|
||||
foreach (var childClosure in taskClosures)
|
||||
{
|
||||
foreach (var kvp in childClosure)
|
||||
{
|
||||
childClosures[kvp.Key] = kvp.Value;
|
||||
}
|
||||
|
||||
_currentClosurePool.Return(childClosure);
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, token);
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
var currentClosures = _currentClosurePool.Get();
|
||||
try
|
||||
{
|
||||
Interlocked.Increment(ref _objectCount);
|
||||
progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _objectCount, Math.Max(_objectCount, _objectsFound)));
|
||||
foreach (var item in items)
|
||||
{
|
||||
if (item.NeedsStorage)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
Save(item, childCancellationTokenSource.Token);
|
||||
}
|
||||
|
||||
if (!currentClosures.ContainsKey(item.Id))
|
||||
{
|
||||
currentClosures.Add(item.Id, new NodeInfo(item.Json, item.Closures));
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_childClosurePool.Return(childClosures);
|
||||
}
|
||||
|
||||
return currentClosures;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception e)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
RecordException(e);
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
_childClosurePool.Return(childClosures);
|
||||
return currentClosures;
|
||||
}
|
||||
|
||||
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
|
||||
{
|
||||
if (_processSource.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
if (!_options.SkipServer && batch.Items.Count != 0)
|
||||
{
|
||||
var objectBatch = batch.Items.Distinct().ToList();
|
||||
var hasObjects = await serverObjectManager
|
||||
.HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), cancellationToken)
|
||||
.HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), _processSource.Token)
|
||||
.ConfigureAwait(false);
|
||||
objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList();
|
||||
if (objectBatch.Count != 0)
|
||||
{
|
||||
await serverObjectManager.UploadObjects(objectBatch, true, progress, cancellationToken).ConfigureAwait(false);
|
||||
await serverObjectManager
|
||||
.UploadObjects(objectBatch, true, progress, _processSource.Token)
|
||||
.ConfigureAwait(false);
|
||||
Interlocked.Exchange(ref _uploaded, _uploaded + batch.Items.Count);
|
||||
}
|
||||
|
||||
@@ -225,19 +311,22 @@ public sealed class SerializeProcess(
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
_processSource.Cancel();
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception e)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
_logger.LogError(e, "Error sending objects to server");
|
||||
throw;
|
||||
RecordException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public override void SaveToCache(List<BaseItem> batch)
|
||||
{
|
||||
if (_processSource.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
if (!_options.SkipCacheWrite && batch.Count != 0)
|
||||
@@ -249,14 +338,21 @@ public sealed class SerializeProcess(
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
_processSource.Cancel();
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception e)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
_logger.LogError(e, "Error sending objects to server");
|
||||
throw;
|
||||
RecordException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void RecordException(Exception e)
|
||||
{
|
||||
//order here matters
|
||||
_logger.LogError(e, "Error in SDK");
|
||||
Exception = e;
|
||||
_processSource.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+12
-2
@@ -1,5 +1,15 @@
|
||||
{
|
||||
"Type": "System.OperationCanceledException",
|
||||
"CancellationToken": {
|
||||
"IsCancellationRequested": true,
|
||||
"CanBeCanceled": true,
|
||||
"WaitHandle": {
|
||||
"SafeWaitHandle": {
|
||||
"IsClosed": false,
|
||||
"IsInvalid": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"Data": {},
|
||||
"Message": "The operation was canceled.",
|
||||
"Source": "System.Private.CoreLib"
|
||||
"Type": "OperationCanceledException"
|
||||
}
|
||||
|
||||
+8
-2
@@ -1,5 +1,11 @@
|
||||
{
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Data": {},
|
||||
"InnerException": {
|
||||
"$type": "NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"Data": {},
|
||||
"InnerException": {
|
||||
"$type": "NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
+8
-2
@@ -1,5 +1,11 @@
|
||||
{
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Data": {},
|
||||
"InnerException": {
|
||||
"$type": "NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
|
||||
+2
-2
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
"Type": "NotImplementedException"
|
||||
}
|
||||
|
||||
+2
-2
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
"Type": "NotImplementedException"
|
||||
}
|
||||
|
||||
+2
-2
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Data": {},
|
||||
"Message": "Cannot skip server and cache. Please choose one.",
|
||||
"Source": "Speckle.Sdk"
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
|
||||
+8
-2
@@ -1,5 +1,11 @@
|
||||
{
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Data": {},
|
||||
"InnerException": {
|
||||
"$type": "NotImplementedException",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
|
||||
@@ -61,6 +61,27 @@ public class ExceptionTests
|
||||
await Verify(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_Exceptions_Cache_ExceptionsAfter_10()
|
||||
{
|
||||
var testClass = new TestClass() { RegularProperty = "Hello" };
|
||||
|
||||
var jsonManager = new ExceptionSendCacheManager(exceptionsAfter: 10);
|
||||
await using var process2 = new SerializeProcess(
|
||||
null,
|
||||
jsonManager,
|
||||
new DummyServerObjectManager(),
|
||||
new BaseChildFinder(new BasePropertyGatherer()),
|
||||
new BaseSerializer(jsonManager, new ObjectSerializerFactory(new BasePropertyGatherer())),
|
||||
new NullLoggerFactory(),
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, false, true)
|
||||
);
|
||||
|
||||
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await process2.Serialize(testClass));
|
||||
await Verify(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_Exceptions_Receive_Server_Skip_Both()
|
||||
{
|
||||
|
||||
@@ -2,21 +2,40 @@
|
||||
|
||||
namespace Speckle.Sdk.Serialization.Tests.Framework;
|
||||
|
||||
public class ExceptionSendCacheManager(bool? hasObject = null) : ISqLiteJsonCacheManager
|
||||
public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
|
||||
{
|
||||
private int _count;
|
||||
|
||||
public void Dispose() { }
|
||||
|
||||
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() => throw new NotImplementedException();
|
||||
|
||||
public void DeleteObject(string id) => throw new NotImplementedException();
|
||||
public void DeleteObject(string id) => CheckExceptions();
|
||||
|
||||
public string? GetObject(string id) => null;
|
||||
|
||||
public void SaveObject(string id, string json) => throw new NotImplementedException();
|
||||
public void SaveObject(string id, string json) => CheckExceptions();
|
||||
|
||||
public void UpdateObject(string id, string json) => throw new NotImplementedException();
|
||||
public void UpdateObject(string id, string json) => CheckExceptions();
|
||||
|
||||
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
|
||||
public void SaveObjects(IEnumerable<(string id, string json)> items) => CheckExceptions();
|
||||
|
||||
public bool HasObject(string objectId) => hasObject ?? throw new NotImplementedException();
|
||||
|
||||
private void CheckExceptions()
|
||||
{
|
||||
if (exceptionsAfter is not null)
|
||||
{
|
||||
if (exceptionsAfter.Value > _count)
|
||||
{
|
||||
_count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception("Count exceeded");
|
||||
}
|
||||
}
|
||||
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
using Speckle.Sdk.Common;
|
||||
|
||||
namespace Speckle.Sdk.Testing.Framework;
|
||||
|
||||
public class AggregationExceptionScrubber : WriteOnlyJsonConverter<AggregateException>
|
||||
{
|
||||
private static readonly ExceptionScrubber _innerScrubber = new();
|
||||
|
||||
public override void Write(VerifyJsonWriter writer, AggregateException exception)
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
|
||||
writer.WriteMember(exception, exception.GetType().FullName, "Type");
|
||||
if (exception.InnerExceptions.Count == 1)
|
||||
{
|
||||
writer.WritePropertyName("InnerException");
|
||||
_innerScrubber.Write(writer, exception.InnerException.NotNull());
|
||||
}
|
||||
else
|
||||
{
|
||||
writer.WritePropertyName("InnerExceptions");
|
||||
writer.WriteStartArray();
|
||||
foreach (var innerException in exception.InnerExceptions)
|
||||
{
|
||||
_innerScrubber.Write(writer, innerException);
|
||||
}
|
||||
writer.WriteEndArray();
|
||||
}
|
||||
|
||||
writer.WriteEndObject();
|
||||
}
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
using Argon;
|
||||
|
||||
namespace Speckle.Sdk.Testing.Framework;
|
||||
|
||||
public class ExceptionScrubber : WriteOnlyJsonConverter<Exception>
|
||||
{
|
||||
public override void Write(VerifyJsonWriter writer, Exception value)
|
||||
{
|
||||
var ex = new JObject
|
||||
{
|
||||
["Type"] = value.GetType().FullName,
|
||||
["Message"] = value.Message,
|
||||
["Source"] = value.Source?.Trim(),
|
||||
};
|
||||
//intentionally removed stacktrace to avoid errors on different machines and line numbers
|
||||
writer.WriteRawValue(ex.ToString(Formatting.Indented));
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using Argon;
|
||||
using Speckle.Sdk.Common;
|
||||
using Speckle.Sdk.Serialisation;
|
||||
|
||||
namespace Speckle.Sdk.Testing.Framework;
|
||||
|
||||
[SuppressMessage("Design", "CA1062:Validate arguments of public methods")]
|
||||
public class IdStringSerializer : JsonConverter
|
||||
{
|
||||
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
|
||||
{
|
||||
var id = (Id)value;
|
||||
writer.WriteValue(id.Value);
|
||||
}
|
||||
|
||||
public override object? ReadJson(JsonReader reader, Type type, object? existingValue, JsonSerializer serializer)
|
||||
{
|
||||
var json = reader.ReadAsString();
|
||||
return new Id(json.NotNull());
|
||||
}
|
||||
|
||||
public override bool CanConvert(Type type) => typeof(Id) == type;
|
||||
}
|
||||
@@ -25,13 +25,7 @@ public static class SpeckleVerify
|
||||
VerifierSettings.DontIgnoreEmptyCollections();
|
||||
VerifierSettings.SortPropertiesAlphabetically();
|
||||
VerifierSettings.SortJsonObjects();
|
||||
VerifierSettings.AddExtraSettings(x =>
|
||||
{
|
||||
var existing = x.Converters.OfType<WriteOnlyJsonConverter<AggregateException>>().First();
|
||||
x.Converters.Remove(existing);
|
||||
x.Converters.Add(new AggregationExceptionScrubber());
|
||||
x.Converters.Add(new ExceptionScrubber());
|
||||
});
|
||||
VerifierSettings.IgnoreStackTrace();
|
||||
VerifyQuibble.Initialize();
|
||||
}
|
||||
|
||||
|
||||
@@ -11,8 +11,6 @@ public class BatchTests
|
||||
public int ByteSize { get; } = size;
|
||||
}
|
||||
|
||||
private static readonly Action<string> EMPTY_LOGGER = _ => { };
|
||||
|
||||
[Fact]
|
||||
public void TestBatchSize_Calc()
|
||||
{
|
||||
@@ -23,16 +21,6 @@ public class BatchTests
|
||||
batch.BatchByteSize.Should().Be(3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Ensure_logging()
|
||||
{
|
||||
using var batch = new Batch<BatchItem>();
|
||||
batch.AddBatchItem(new BatchItem(2));
|
||||
bool called = false;
|
||||
batch.GetBatchSize(x => called = true, 1);
|
||||
called.Should().BeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TestBatchSize_Trim()
|
||||
{
|
||||
@@ -75,13 +63,13 @@ public class BatchTests
|
||||
|
||||
using var batch = BatchExtensions.CreateBatch<BatchItem>();
|
||||
batch.AddBatchItem(new BatchItem(2));
|
||||
bool full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
bool full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeFalse();
|
||||
batch.AddBatchItem(new BatchItem(2));
|
||||
full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeFalse();
|
||||
batch.AddBatchItem(new BatchItem(2));
|
||||
full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeTrue();
|
||||
}
|
||||
|
||||
@@ -92,7 +80,7 @@ public class BatchTests
|
||||
|
||||
using var batch = BatchExtensions.CreateBatch<BatchItem>();
|
||||
batch.AddBatchItem(new BatchItem(63));
|
||||
bool full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
bool full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeTrue();
|
||||
}
|
||||
|
||||
@@ -103,10 +91,10 @@ public class BatchTests
|
||||
|
||||
using var batch = BatchExtensions.CreateBatch<BatchItem>();
|
||||
batch.AddBatchItem(new BatchItem(2));
|
||||
bool full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
bool full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeFalse();
|
||||
batch.AddBatchItem(new BatchItem(63));
|
||||
full = batch.GetBatchSize(EMPTY_LOGGER, MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full = batch.GetBatchSize(MAX_BATCH_SIZE) == MAX_BATCH_SIZE;
|
||||
full.Should().BeTrue();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user