Add exception handling for SerializeProcess with CancellationTokenSource (#211)

* Add exception handling for SerializeProcess with CancellationTokenSource

* formatting

* add exception test to make sure we handle a server exception

* add extra exception and handling to stop

* add comment and another test

* one last chance for user to cancel

* formatting
This commit is contained in:
Adam Hathcock
2025-01-23 17:06:08 +00:00
committed by GitHub
parent cc23c147be
commit f81fc97a91
13 changed files with 285 additions and 37 deletions
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
@@ -31,13 +32,23 @@ public sealed class SerializeProcess(
IServerObjectManager serverObjectManager,
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
ILoggerFactory loggerFactory,
SerializeProcessOptions? options = null
) : ChannelSaver<BaseItem>, ISerializeProcess
{
private readonly PriorityScheduler _highest = new(ThreadPriority.Highest, 2);
private readonly PriorityScheduler _belowNormal = new(ThreadPriority.BelowNormal, Environment.ProcessorCount * 2);
private readonly PriorityScheduler _highest = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.Highest,
2
);
private readonly PriorityScheduler _belowNormal = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.BelowNormal,
Environment.ProcessorCount * 2
);
private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false);
private readonly ILogger<SerializeProcess> _logger = loggerFactory.CreateLogger<SerializeProcess>();
private readonly ConcurrentDictionary<Id, ObjectReference> _objectReferences = new();
private readonly Pool<List<(Id, Json, Closures)>> _pool = Pools.CreateListPool<(Id, Json, Closures)>();
@@ -71,16 +82,15 @@ public sealed class SerializeProcess(
{
findTotalObjectsTask = Task.Factory.StartNew(
() => TraverseTotal(root),
default,
cancellationToken,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_highest
);
}
await Traverse(root, cancellationToken).ConfigureAwait(false);
await Done().ConfigureAwait(true);
await channelTask.ConfigureAwait(false);
await findTotalObjectsTask.ConfigureAwait(false);
await Traverse(root, cancellationToken).ConfigureAwait(true);
DoneTraversing();
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(true);
await DoneSaving().ConfigureAwait(true);
return new(root.id.NotNull(), _objectReferences.Freeze());
}
@@ -103,7 +113,10 @@ public sealed class SerializeProcess(
var tmp = child;
var t = Task
.Factory.StartNew(
() => Traverse(tmp, cancellationToken),
async () =>
{
return await Traverse(tmp, cancellationToken).ConfigureAwait(true);
},
cancellationToken,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_belowNormal
@@ -204,29 +217,50 @@ public sealed class SerializeProcess(
public override async Task SendToServer(Batch<BaseItem> batch, CancellationToken cancellationToken)
{
if (!_options.SkipServer && batch.Items.Count != 0)
try
{
var objectBatch = batch.Items.Distinct().ToList();
var hasObjects = await serverObjectManager
.HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), cancellationToken)
.ConfigureAwait(false);
objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList();
if (objectBatch.Count != 0)
if (!_options.SkipServer && batch.Items.Count != 0)
{
await serverObjectManager.UploadObjects(objectBatch, true, progress, cancellationToken).ConfigureAwait(false);
Interlocked.Exchange(ref _uploaded, _uploaded + batch.Items.Count);
var objectBatch = batch.Items.Distinct().ToList();
var hasObjects = await serverObjectManager
.HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), cancellationToken)
.ConfigureAwait(false);
objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList();
if (objectBatch.Count != 0)
{
await serverObjectManager.UploadObjects(objectBatch, true, progress, cancellationToken).ConfigureAwait(false);
Interlocked.Exchange(ref _uploaded, _uploaded + batch.Items.Count);
}
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, null));
}
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, null));
}
#pragma warning disable CA1031
catch (Exception e)
#pragma warning restore CA1031
{
_logger.LogError(e, "Error sending objects to server");
throw;
}
}
public override void SaveToCache(List<BaseItem> batch)
{
if (!_options.SkipCacheWrite && batch.Count != 0)
try
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _objectsSerialized));
if (!_options.SkipCacheWrite && batch.Count != 0)
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _objectsSerialized));
}
}
#pragma warning disable CA1031
catch (Exception e)
#pragma warning restore CA1031
{
_logger.LogError(e, "Error sending objects to server");
throw;
}
}
}