Use IAsyncDisposable on scheduler which also waits for completion (#235)

* Use IAsyncDisposable on scheduler which also waits for completion

* formatting
This commit is contained in:
Adam Hathcock
2025-03-03 14:50:38 +00:00
committed by GitHub
parent 1fea4cc01b
commit fe964f4c8e
3 changed files with 25 additions and 29 deletions
@@ -8,20 +8,35 @@ public sealed class PriorityScheduler(
ThreadPriority priority,
int maximumConcurrencyLevel,
CancellationToken cancellationToken
) : TaskScheduler, IDisposable
) : TaskScheduler, IAsyncDisposable
{
#pragma warning disable CA2213
//intentionally not disposing this because syncing to when all the threads are done AFTER the BC is done/disposed is hard. BC will still be cleaned up by the finalizer
private readonly BlockingCollection<Task> _tasks = new();
#pragma warning restore CA2213
private Thread[]? _threads;
public void Dispose() => _tasks.CompleteAdding();
public override int MaximumConcurrencyLevel => maximumConcurrencyLevel;
protected override IEnumerable<Task> GetScheduledTasks() => _tasks;
public async ValueTask DisposeAsync()
{
await WaitForCompletion().ConfigureAwait(false);
_tasks.Dispose();
}
public async ValueTask WaitForCompletion()
{
if (_tasks.IsCompleted && _threads is null)
{
return;
}
_tasks.CompleteAdding();
while (_threads != null && _threads.Any(x => x.IsAlive))
{
await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
}
_threads = null;
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
@@ -85,23 +100,4 @@ public sealed class PriorityScheduler(
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; // we might not want to execute task that should schedule as high or low priority inline
public Task WaitForCompletion()
{
_tasks.CompleteAdding();
return Task
.Factory.StartNew(
async () =>
{
while (_threads != null && _threads.Any(x => x.IsAlive))
{
await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
}
},
CancellationToken.None,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default
)
.Unwrap();
}
}
@@ -71,8 +71,8 @@ public sealed class DeserializeProcess(
public async ValueTask DisposeAsync()
{
objectLoader.Dispose();
_belowNormal.Dispose();
await _belowNormal.WaitForCompletion().ConfigureAwait(false);
await _belowNormal.DisposeAsync().ConfigureAwait(false);
}
/// <summary>
@@ -81,10 +81,10 @@ public sealed class SerializeProcess(
[AutoInterfaceIgnore]
public async ValueTask DisposeAsync()
{
_highest.Dispose();
_belowNormal.Dispose();
sqLiteJsonCacheManager.Dispose();
await WaitForSchedulerCompletion().ConfigureAwait(false);
await _highest.DisposeAsync().ConfigureAwait(false);
await _belowNormal.DisposeAsync().ConfigureAwait(false);
sqLiteJsonCacheManager.Dispose();
}
public void ThrowIfFailed()