fix: an exception in the underlying code results in UnobservedTaskExceptions and not ending of the send process. (#237)
* Cancel all channels when first exception happens then throw that exception * Use single exception to end things. * fix verifications * fmt * Fix tests as stacktrace was removed * Handle one exception and cancel on receive * moved ThrowIfFailed and made throw speckle exceptions * Fixed tests * fmt
This commit is contained in:
@@ -3,7 +3,7 @@ using Open.ChannelExtensions;
|
||||
|
||||
namespace Speckle.Sdk.Dependencies.Serialization;
|
||||
|
||||
public abstract class ChannelLoader<T>
|
||||
public abstract class ChannelLoader<T>(CancellationToken cancellationToken)
|
||||
{
|
||||
private const int RECEIVE_CAPACITY = 5000;
|
||||
|
||||
@@ -13,7 +13,8 @@ public abstract class ChannelLoader<T>
|
||||
private const int MAX_SAVE_CACHE_BATCH = 500;
|
||||
private const int MAX_SAVE_CACHE_PARALLELISM = 4;
|
||||
|
||||
private readonly List<Exception> _exceptions = new();
|
||||
private readonly CancellationTokenSource _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
private readonly Channel<string> _channel = Channel.CreateBounded<string>(
|
||||
new BoundedChannelOptions(RECEIVE_CAPACITY)
|
||||
{
|
||||
@@ -26,14 +27,10 @@ public abstract class ChannelLoader<T>
|
||||
_ => throw new NotImplementedException("Dropping items not supported.")
|
||||
);
|
||||
|
||||
protected async Task GetAndCache(
|
||||
IEnumerable<string> allChildrenIds,
|
||||
CancellationToken cancellationToken,
|
||||
int? maxParallelism = null
|
||||
) =>
|
||||
protected async Task GetAndCache(IEnumerable<string> allChildrenIds, int? maxParallelism = null) =>
|
||||
await _channel
|
||||
.Source(allChildrenIds, cancellationToken)
|
||||
.Pipe(maxParallelism ?? Environment.ProcessorCount, CheckCache, cancellationToken: cancellationToken)
|
||||
.Source(allChildrenIds, _cts.Token)
|
||||
.Pipe(maxParallelism ?? Environment.ProcessorCount, CheckCache, cancellationToken: _cts.Token)
|
||||
.Filter(x => x is not null)
|
||||
.Batch(HTTP_GET_CHUNK_SIZE)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
@@ -42,52 +39,76 @@ public abstract class ChannelLoader<T>
|
||||
async x => await Download(x).ConfigureAwait(false),
|
||||
-1,
|
||||
false,
|
||||
cancellationToken
|
||||
_cts.Token
|
||||
)
|
||||
.Join()
|
||||
.Batch(MAX_SAVE_CACHE_BATCH)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.ReadAllConcurrently(maxParallelism ?? MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ReadAllConcurrently(maxParallelism ?? MAX_SAVE_CACHE_PARALLELISM, SaveToCache, _cts.Token)
|
||||
.ContinueWith(
|
||||
t =>
|
||||
{
|
||||
Exception? ex = t.Exception;
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !_cts.Token.IsCancellationRequested)
|
||||
{
|
||||
ex = new OperationCanceledException();
|
||||
}
|
||||
|
||||
if (ex is not null)
|
||||
{
|
||||
if (ex is AggregateException ae)
|
||||
{
|
||||
_exceptions.AddRange(ae.Flatten().InnerExceptions);
|
||||
}
|
||||
else
|
||||
{
|
||||
_exceptions.Add(ex);
|
||||
}
|
||||
RecordException(ex);
|
||||
}
|
||||
|
||||
_channel.Writer.TryComplete(ex);
|
||||
},
|
||||
CancellationToken.None,
|
||||
_cts.Token,
|
||||
TaskContinuationOptions.ExecuteSynchronously,
|
||||
TaskScheduler.Current
|
||||
)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
public void CheckForExceptions()
|
||||
public abstract string? CheckCache(string id);
|
||||
|
||||
public async Task<List<T>> Download(List<string?> ids)
|
||||
{
|
||||
if (_exceptions.Count > 0)
|
||||
try
|
||||
{
|
||||
throw new AggregateException(_exceptions);
|
||||
return await DownloadInternal(ids).ConfigureAwait(false);
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception ex)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
RecordException(ex);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
public abstract string? CheckCache(string id);
|
||||
protected abstract Task<List<T>> DownloadInternal(List<string?> batch);
|
||||
|
||||
public abstract Task<List<T>> Download(List<string?> ids);
|
||||
public void SaveToCache(List<T> batch)
|
||||
{
|
||||
try
|
||||
{
|
||||
SaveToCacheInternal(batch);
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception ex)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
RecordException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void SaveToCache(List<T> x);
|
||||
protected abstract void SaveToCacheInternal(List<T> batch);
|
||||
|
||||
protected Exception? Exception { get; private set; }
|
||||
|
||||
private void RecordException(Exception ex)
|
||||
{
|
||||
Exception = ex;
|
||||
_channel.Writer.TryComplete(ex);
|
||||
//cancel everything!
|
||||
_cts.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ using Speckle.Sdk.Serialisation.V2.Send;
|
||||
|
||||
namespace Speckle.Sdk.Dependencies.Serialization;
|
||||
|
||||
public abstract class ChannelSaver<T>
|
||||
public abstract class ChannelSaver<T>(CancellationToken cancellationToken)
|
||||
where T : IHasByteSize
|
||||
{
|
||||
private const int SEND_CAPACITY = 500;
|
||||
@@ -16,7 +16,8 @@ public abstract class ChannelSaver<T>
|
||||
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
|
||||
private const int MAX_CACHE_BATCH = 500;
|
||||
|
||||
private readonly List<Exception> _exceptions = new();
|
||||
private readonly CancellationTokenSource _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
|
||||
new BoundedChannelOptions(SEND_CAPACITY)
|
||||
{
|
||||
@@ -29,7 +30,7 @@ public abstract class ChannelSaver<T>
|
||||
_ => throw new NotImplementedException("Dropping items not supported.")
|
||||
);
|
||||
|
||||
public Task Start(CancellationToken cancellationToken) =>
|
||||
public Task Start() =>
|
||||
_checkCacheChannel
|
||||
.Reader.BatchByByteSize(HTTP_SEND_CHUNK_SIZE)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
@@ -38,75 +39,92 @@ public abstract class ChannelSaver<T>
|
||||
async x => await SendToServer(x).ConfigureAwait(false),
|
||||
HTTP_CAPACITY,
|
||||
false,
|
||||
cancellationToken
|
||||
_cts.Token
|
||||
)
|
||||
.Join()
|
||||
.Batch(MAX_CACHE_BATCH)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, _cts.Token)
|
||||
.ContinueWith(
|
||||
t =>
|
||||
{
|
||||
Exception? ex = t.Exception;
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
|
||||
if (ex is null && t.Status is TaskStatus.Canceled && !_cts.Token.IsCancellationRequested)
|
||||
{
|
||||
ex = new OperationCanceledException();
|
||||
}
|
||||
|
||||
if (ex is not null)
|
||||
{
|
||||
lock (_exceptions)
|
||||
{
|
||||
_exceptions.Add(ex);
|
||||
}
|
||||
RecordException(ex);
|
||||
}
|
||||
_checkCacheChannel.Writer.TryComplete(ex);
|
||||
},
|
||||
CancellationToken.None,
|
||||
_cts.Token,
|
||||
TaskContinuationOptions.ExecuteSynchronously,
|
||||
TaskScheduler.Current
|
||||
);
|
||||
|
||||
public async ValueTask Save(T item, CancellationToken cancellationToken) =>
|
||||
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
|
||||
public async ValueTask Save(T item)
|
||||
{
|
||||
if (Exception is not null || _cts.IsCancellationRequested)
|
||||
{
|
||||
return; //don't save if we're already done through an error
|
||||
}
|
||||
await _checkCacheChannel.Writer.WriteAsync(item).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
|
||||
{
|
||||
await SendToServer((Batch<T>)batch).ConfigureAwait(false);
|
||||
return batch;
|
||||
try
|
||||
{
|
||||
await SendToServer((Batch<T>)batch).ConfigureAwait(false);
|
||||
return batch;
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception ex)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
RecordException(ex);
|
||||
return batch;
|
||||
}
|
||||
}
|
||||
|
||||
public abstract Task SendToServer(Batch<T> batch);
|
||||
public async Task SendToServer(Batch<T> batch)
|
||||
{
|
||||
try
|
||||
{
|
||||
await SendToServerInternal(batch).ConfigureAwait(false);
|
||||
}
|
||||
#pragma warning disable CA1031
|
||||
catch (Exception ex)
|
||||
#pragma warning restore CA1031
|
||||
{
|
||||
RecordException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Task SendToServerInternal(Batch<T> batch);
|
||||
|
||||
public abstract void SaveToCache(List<T> item);
|
||||
|
||||
public Task DoneTraversing()
|
||||
{
|
||||
_checkCacheChannel.Writer.TryComplete();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();
|
||||
|
||||
public async Task DoneSaving()
|
||||
{
|
||||
await _checkCacheChannel.Reader.Completion.ConfigureAwait(false);
|
||||
lock (_exceptions)
|
||||
if (!_checkCacheChannel.Reader.Completion.IsCompleted)
|
||||
{
|
||||
if (_exceptions.Count > 0)
|
||||
{
|
||||
var exceptions = new List<Exception>();
|
||||
foreach (var ex in _exceptions)
|
||||
{
|
||||
if (ex is AggregateException ae)
|
||||
{
|
||||
exceptions.AddRange(ae.Flatten().InnerExceptions);
|
||||
}
|
||||
else
|
||||
{
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
}
|
||||
throw new AggregateException(exceptions);
|
||||
}
|
||||
await _checkCacheChannel.Reader.Completion.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
protected Exception? Exception { get; set; }
|
||||
|
||||
private void RecordException(Exception ex)
|
||||
{
|
||||
Exception = ex;
|
||||
_checkCacheChannel.Writer.TryComplete(ex);
|
||||
//cancel everything!
|
||||
_cts.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,14 @@ namespace Speckle.Sdk.Api;
|
||||
|
||||
public partial class Operations
|
||||
{
|
||||
/// <summary>
|
||||
/// Receives a Object to the provided URL and Caches the results
|
||||
/// </summary>
|
||||
/// <remarks/>
|
||||
/// <exception cref="ArgumentException">No transports were specified</exception>
|
||||
/// <exception cref="ArgumentNullException">The <paramref name="objectId"/> was <see langword="null"/></exception>
|
||||
/// <exception cref="SpeckleException">Serialization or Send operation was unsuccessful</exception>
|
||||
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> requested cancellation</exception>
|
||||
public async Task<Base> Receive2(
|
||||
Uri url,
|
||||
string streamId,
|
||||
|
||||
@@ -11,6 +11,14 @@ namespace Speckle.Sdk.Api;
|
||||
|
||||
public partial class Operations
|
||||
{
|
||||
/// <summary>
|
||||
/// Sends a Speckle Object to the provided URL and Caches the results
|
||||
/// </summary>
|
||||
/// <remarks/>
|
||||
/// <exception cref="ArgumentException">No transports were specified</exception>
|
||||
/// <exception cref="ArgumentNullException">The <paramref name="value"/> was <see langword="null"/></exception>
|
||||
/// <exception cref="SpeckleException">Serialization or Send operation was unsuccessful</exception>
|
||||
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> requested cancellation</exception>
|
||||
public async Task<SerializeProcessResults> Send2(
|
||||
Uri url,
|
||||
string streamId,
|
||||
|
||||
@@ -17,7 +17,9 @@ public sealed class ObjectLoader(
|
||||
IServerObjectManager serverObjectManager,
|
||||
IProgress<ProgressArgs>? progress,
|
||||
CancellationToken cancellationToken
|
||||
) : ChannelLoader<BaseItem>, IObjectLoader
|
||||
#pragma warning disable CS9107 // Parameter is captured into the state of the enclosing type and its value is also passed to the base constructor. The value might be captured by the base class as well.
|
||||
) : ChannelLoader<BaseItem>(cancellationToken), IObjectLoader
|
||||
#pragma warning restore CS9107 // Parameter is captured into the state of the enclosing type and its value is also passed to the base constructor. The value might be captured by the base class as well.
|
||||
{
|
||||
private int? _allChildrenCount;
|
||||
private long _checkCache;
|
||||
@@ -31,47 +33,55 @@ public sealed class ObjectLoader(
|
||||
|
||||
public async Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(string rootId, DeserializeProcessOptions options)
|
||||
{
|
||||
_options = options;
|
||||
string? rootJson;
|
||||
if (!options.SkipCache)
|
||||
try
|
||||
{
|
||||
rootJson = sqLiteJsonCacheManager.GetObject(rootId);
|
||||
if (rootJson != null)
|
||||
{
|
||||
//assume everything exists as the root is there.
|
||||
var allChildren = ClosureParser.GetChildrenIds(rootJson, cancellationToken).Select(x => new Id(x)).ToList();
|
||||
//this probably yields away from the Main thread to let host apps update progress
|
||||
//in any case, this fixes a Revit only issue for this situation
|
||||
await Task.Yield();
|
||||
return (new(rootJson), allChildren);
|
||||
}
|
||||
}
|
||||
if (!options.SkipServer)
|
||||
{
|
||||
rootJson = await serverObjectManager
|
||||
.DownloadSingleObject(rootId, progress, cancellationToken)
|
||||
.NotNull()
|
||||
.ConfigureAwait(false);
|
||||
IReadOnlyCollection<Id> allChildrenIds = ClosureParser
|
||||
.GetClosures(rootJson, cancellationToken)
|
||||
.OrderByDescending(x => x.Item2)
|
||||
.Select(x => new Id(x.Item1))
|
||||
.Where(x => !x.Value.StartsWith("blob", StringComparison.Ordinal))
|
||||
.Freeze();
|
||||
_allChildrenCount = allChildrenIds.Count;
|
||||
await GetAndCache(allChildrenIds.Select(x => x.Value), cancellationToken, _options.MaxParallelism)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
CheckForExceptions();
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
//save the root last to shortcut later
|
||||
_options = options;
|
||||
string? rootJson;
|
||||
if (!options.SkipCache)
|
||||
{
|
||||
sqLiteJsonCacheManager.SaveObject(rootId, rootJson);
|
||||
rootJson = sqLiteJsonCacheManager.GetObject(rootId);
|
||||
if (rootJson != null)
|
||||
{
|
||||
//assume everything exists as the root is there.
|
||||
var allChildren = ClosureParser.GetChildrenIds(rootJson, cancellationToken).Select(x => new Id(x)).ToList();
|
||||
//this probably yields away from the Main thread to let host apps update progress
|
||||
//in any case, this fixes a Revit only issue for this situation
|
||||
await Task.Yield();
|
||||
return (new(rootJson), allChildren);
|
||||
}
|
||||
}
|
||||
|
||||
return (new(rootJson), allChildrenIds);
|
||||
if (!options.SkipServer)
|
||||
{
|
||||
rootJson = await serverObjectManager
|
||||
.DownloadSingleObject(rootId, progress, cancellationToken)
|
||||
.NotNull()
|
||||
.ConfigureAwait(false);
|
||||
IReadOnlyCollection<Id> allChildrenIds = ClosureParser
|
||||
.GetClosures(rootJson, cancellationToken)
|
||||
.OrderByDescending(x => x.Item2)
|
||||
.Select(x => new Id(x.Item1))
|
||||
.Where(x => !x.Value.StartsWith("blob", StringComparison.Ordinal))
|
||||
.Freeze();
|
||||
_allChildrenCount = allChildrenIds.Count;
|
||||
ThrowIfFailed();
|
||||
await GetAndCache(allChildrenIds.Select(x => x.Value), _options.MaxParallelism).ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
//save the root last to shortcut later
|
||||
if (!options.SkipCache)
|
||||
{
|
||||
sqLiteJsonCacheManager.SaveObject(rootId, rootJson);
|
||||
}
|
||||
ThrowIfFailed();
|
||||
return (new(rootJson), allChildrenIds);
|
||||
}
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
ThrowIfFailed();
|
||||
throw;
|
||||
}
|
||||
|
||||
throw new SpeckleException("Cannot skip server and cache. Please choose one.");
|
||||
}
|
||||
|
||||
@@ -90,7 +100,7 @@ public sealed class ObjectLoader(
|
||||
}
|
||||
|
||||
[AutoInterfaceIgnore]
|
||||
public override async Task<List<BaseItem>> Download(List<string?> ids)
|
||||
protected override async Task<List<BaseItem>> DownloadInternal(List<string?> ids)
|
||||
{
|
||||
var toCache = new List<BaseItem>();
|
||||
await foreach (
|
||||
@@ -117,7 +127,7 @@ public sealed class ObjectLoader(
|
||||
}
|
||||
|
||||
[AutoInterfaceIgnore]
|
||||
public override void SaveToCache(List<BaseItem> batch)
|
||||
protected override void SaveToCacheInternal(List<BaseItem> batch)
|
||||
{
|
||||
if (!_options.SkipCache)
|
||||
{
|
||||
@@ -129,4 +139,13 @@ public sealed class ObjectLoader(
|
||||
}
|
||||
|
||||
public string? LoadId(string id) => sqLiteJsonCacheManager.GetObject(id);
|
||||
|
||||
private void ThrowIfFailed()
|
||||
{
|
||||
if (Exception is not null)
|
||||
{
|
||||
throw new SpeckleException("Error while sending", Exception);
|
||||
}
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,9 @@ public sealed class SerializeProcess(
|
||||
ILoggerFactory loggerFactory,
|
||||
CancellationToken cancellationToken,
|
||||
SerializeProcessOptions? options = null
|
||||
) : ChannelSaver<BaseItem>, ISerializeProcess
|
||||
#pragma warning disable CS9107 // Parameter is captured into the state of the enclosing type and its value is also passed to the base constructor. The value might be captured by the base class as well.
|
||||
) : ChannelSaver<BaseItem>(cancellationToken), ISerializeProcess
|
||||
#pragma warning restore CS9107 // Parameter is captured into the state of the enclosing type and its value is also passed to the base constructor. The value might be captured by the base class as well.
|
||||
{
|
||||
//async dispose
|
||||
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed")]
|
||||
@@ -81,6 +83,15 @@ public sealed class SerializeProcess(
|
||||
await WaitForSchedulerCompletion().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public void ThrowIfFailed()
|
||||
{
|
||||
if (Exception is not null)
|
||||
{
|
||||
throw new SpeckleException("Error while sending", Exception);
|
||||
}
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
private async Task WaitForSchedulerCompletion()
|
||||
{
|
||||
await _highest.WaitForCompletion().ConfigureAwait(false);
|
||||
@@ -89,27 +100,36 @@ public sealed class SerializeProcess(
|
||||
|
||||
public async Task<SerializeProcessResults> Serialize(Base root)
|
||||
{
|
||||
var channelTask = Start(cancellationToken);
|
||||
var findTotalObjectsTask = Task.CompletedTask;
|
||||
if (!_options.SkipFindTotalObjects)
|
||||
try
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
findTotalObjectsTask = Task.Factory.StartNew(
|
||||
() => TraverseTotal(root),
|
||||
cancellationToken,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_highest
|
||||
);
|
||||
var channelTask = Start();
|
||||
var findTotalObjectsTask = Task.CompletedTask;
|
||||
if (!_options.SkipFindTotalObjects)
|
||||
{
|
||||
ThrowIfFailed();
|
||||
findTotalObjectsTask = Task.Factory.StartNew(
|
||||
() => TraverseTotal(root),
|
||||
cancellationToken,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_highest
|
||||
);
|
||||
}
|
||||
|
||||
await Traverse(root).ConfigureAwait(false);
|
||||
DoneTraversing();
|
||||
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
await DoneSaving().ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
await WaitForSchedulerCompletion().ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
ThrowIfFailed();
|
||||
throw;
|
||||
}
|
||||
await Traverse(root).ConfigureAwait(false);
|
||||
await DoneTraversing().ConfigureAwait(false);
|
||||
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
await DoneSaving().ConfigureAwait(false);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
await WaitForSchedulerCompletion().ConfigureAwait(false);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
|
||||
}
|
||||
|
||||
private void TraverseTotal(Base obj)
|
||||
@@ -166,7 +186,7 @@ public sealed class SerializeProcess(
|
||||
if (item.NeedsStorage)
|
||||
{
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
await Save(item, cancellationToken).ConfigureAwait(false);
|
||||
await Save(item).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (!currentClosures.ContainsKey(item.Id))
|
||||
@@ -178,7 +198,7 @@ public sealed class SerializeProcess(
|
||||
return currentClosures;
|
||||
}
|
||||
|
||||
public override async Task SendToServer(Batch<BaseItem> batch)
|
||||
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
||||
+3
-6
@@ -1,8 +1,5 @@
|
||||
{
|
||||
"Type": "System.AggregateException",
|
||||
"InnerException": {
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
}
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
}
|
||||
|
||||
+3
-6
@@ -1,8 +1,5 @@
|
||||
{
|
||||
"Type": "System.AggregateException",
|
||||
"InnerException": {
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
}
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
}
|
||||
|
||||
+3
-23
@@ -1,25 +1,5 @@
|
||||
{
|
||||
"Type": "System.AggregateException",
|
||||
"InnerExceptions": [
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
},
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
},
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
},
|
||||
{
|
||||
"Type": "System.NotImplementedException",
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Source": "Speckle.Sdk.Serialization.Tests"
|
||||
}
|
||||
]
|
||||
"Type": "Speckle.Sdk.SpeckleException",
|
||||
"Message": "Error while sending",
|
||||
"Source": "Speckle.Sdk"
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ public class ExceptionTests
|
||||
);
|
||||
|
||||
//4 exceptions are fine because we use 4 threads for saving cache
|
||||
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass));
|
||||
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await process2.Serialize(testClass));
|
||||
await Verify(ex);
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ public class ExceptionTests
|
||||
new SerializeProcessOptions(false, false, false, true)
|
||||
);
|
||||
|
||||
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass));
|
||||
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await process2.Serialize(testClass));
|
||||
await Verify(ex);
|
||||
}
|
||||
|
||||
@@ -137,12 +137,11 @@ public class ExceptionTests
|
||||
}
|
||||
else
|
||||
{
|
||||
ex = await Assert.ThrowsAsync<AggregateException>(async () =>
|
||||
ex = await Assert.ThrowsAsync<SpeckleException>(async () =>
|
||||
{
|
||||
var root = await process.Deserialize(rootId);
|
||||
});
|
||||
}
|
||||
|
||||
await Verify(ex).UseParameters(hasObject);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,21 +4,15 @@ namespace Speckle.Sdk.Testing.Framework;
|
||||
|
||||
public class ExceptionScrubber : WriteOnlyJsonConverter<Exception>
|
||||
{
|
||||
public ExceptionScrubber() { }
|
||||
|
||||
public override void Write(VerifyJsonWriter writer, Exception value)
|
||||
{
|
||||
if (value.StackTrace != null)
|
||||
var ex = new JObject
|
||||
{
|
||||
var ex = new JObject
|
||||
{
|
||||
["Type"] = value.GetType().FullName,
|
||||
["Message"] = value.Message,
|
||||
["Source"] = value.Source?.Trim(),
|
||||
};
|
||||
writer.WriteRawValue(ex.ToString(Formatting.Indented));
|
||||
return;
|
||||
}
|
||||
base.Write(writer, value.ToString());
|
||||
["Type"] = value.GetType().FullName,
|
||||
["Message"] = value.Message,
|
||||
["Source"] = value.Source?.Trim(),
|
||||
};
|
||||
//intentionally removed stacktrace to avoid errors on different machines and line numbers
|
||||
writer.WriteRawValue(ex.ToString(Formatting.Indented));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user