diff --git a/src/Speckle.Sdk/Serialisation/V2/PriorityScheduler.cs b/src/Speckle.Sdk/Serialisation/V2/PriorityScheduler.cs index 7741cfbc..a1fd670a 100644 --- a/src/Speckle.Sdk/Serialisation/V2/PriorityScheduler.cs +++ b/src/Speckle.Sdk/Serialisation/V2/PriorityScheduler.cs @@ -8,20 +8,35 @@ public sealed class PriorityScheduler( ThreadPriority priority, int maximumConcurrencyLevel, CancellationToken cancellationToken -) : TaskScheduler, IDisposable +) : TaskScheduler, IAsyncDisposable { -#pragma warning disable CA2213 - //intentionally not disposing this because syncing to when all the threads are done AFTER the BC is done/disposed is hard. BC will still be cleaned up by the finalizer private readonly BlockingCollection _tasks = new(); -#pragma warning restore CA2213 private Thread[]? _threads; - public void Dispose() => _tasks.CompleteAdding(); - public override int MaximumConcurrencyLevel => maximumConcurrencyLevel; protected override IEnumerable GetScheduledTasks() => _tasks; + public async ValueTask DisposeAsync() + { + await WaitForCompletion().ConfigureAwait(false); + _tasks.Dispose(); + } + + public async ValueTask WaitForCompletion() + { + if (_tasks.IsCompleted && _threads is null) + { + return; + } + _tasks.CompleteAdding(); + while (_threads != null && _threads.Any(x => x.IsAlive)) + { + await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); + } + _threads = null; + } + protected override void QueueTask(Task task) { _tasks.Add(task); @@ -85,23 +100,4 @@ public sealed class PriorityScheduler( } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; // we might not want to execute task that should schedule as high or low priority inline - - public Task WaitForCompletion() - { - _tasks.CompleteAdding(); - return Task - .Factory.StartNew( - async () => - { - while (_threads != null && _threads.Any(x => x.IsAlive)) - { - await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false); - } - }, - CancellationToken.None, - TaskCreationOptions.AttachedToParent, - TaskScheduler.Default - ) - .Unwrap(); - } } diff --git a/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs index cbe8feac..e0e62922 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs @@ -71,8 +71,8 @@ public sealed class DeserializeProcess( public async ValueTask DisposeAsync() { objectLoader.Dispose(); - _belowNormal.Dispose(); await _belowNormal.WaitForCompletion().ConfigureAwait(false); + await _belowNormal.DisposeAsync().ConfigureAwait(false); } /// diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs index bcddbe92..04edb89e 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs @@ -81,10 +81,10 @@ public sealed class SerializeProcess( [AutoInterfaceIgnore] public async ValueTask DisposeAsync() { - _highest.Dispose(); - _belowNormal.Dispose(); - sqLiteJsonCacheManager.Dispose(); await WaitForSchedulerCompletion().ConfigureAwait(false); + await _highest.DisposeAsync().ConfigureAwait(false); + await _belowNormal.DisposeAsync().ConfigureAwait(false); + sqLiteJsonCacheManager.Dispose(); } public void ThrowIfFailed()