refactor!: Major code cleanup for clearer state (#27)
.NET Build and Publish / build (push) Has been cancelled

* Quick pass

* no locks

* github actions

* publish via github actions

* pack

* Parent non-optional

* Generics are fun

* second pass

* Group workers, CTS, and Task in a class for simpler nullability

* pollysharp

* reverted cancellation changes

* reset state

* Update samples

* changes

* still working

* the fix

* private workers

* comments

* comments 2

* comments 3

* public worker count
This commit is contained in:
Jedd Morgan
2025-06-16 13:00:44 +01:00
committed by GitHub
parent 442b06dff7
commit a50809bc91
5 changed files with 178 additions and 133 deletions
+94 -68
View File
@@ -1,51 +1,76 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Diagnostics;
using Grasshopper.Kernel;
using Timer = System.Timers.Timer;
namespace GrasshopperAsyncComponent;
internal sealed class Worker<T> : IDisposable
where T : GH_Component
{
public required WorkerInstance<T> Instance { get; init; }
public required Task Task { get; init; }
public required CancellationTokenSource CancellationSource { get; init; }
public void Cancel() => CancellationSource.Cancel();
public void Dispose()
{
if (Task.IsCompleted)
{
Task.Dispose();
}
CancellationSource.Dispose();
}
}
/// <summary>
/// Inherit your component from this class to make all the async goodness available.
/// </summary>
public abstract class GH_AsyncComponent : GH_Component, IDisposable
public abstract class GH_AsyncComponent<T> : GH_Component, IDisposable
where T : GH_Component
{
//List<(string, GH_RuntimeMessageLevel)> Errors;
private readonly Action<string, double> _reportProgress;
public ConcurrentDictionary<string, double> ProgressReports { get; protected set; }
private readonly Action _done;
public ConcurrentDictionary<string, double> ProgressReports { get; }
private readonly Timer _displayProgressTimer;
private int _state;
/// <summary>
/// a counter, used to count up the number of workers that have completed,
/// until _setData is set true, when it starts to count down the workers as their data is set.
/// </summary>
private volatile int _state;
private int _setData;
/// <summary>
/// functionally, a boolean, 1 or 0;
/// it will be set to 1 once all workers are ready for SetData to be called on them, then set back to 0.
/// </summary>
private volatile int _setData;
public List<WorkerInstance> Workers { get; protected set; }
private readonly List<Worker<T>> _workers;
private readonly List<Task> _tasks;
public List<CancellationTokenSource> CancellationSources { get; }
public int WorkerCount => _workers.Count;
/// <summary>
/// Set this property inside the constructor of your derived component.
/// </summary>
public WorkerInstance? BaseWorker { get; set; }
public WorkerInstance<T>? BaseWorker { get; set; }
/// <summary>
/// Optional: if you have opinions on how the default system task scheduler should treat your workers, set it here.
/// </summary>
public TaskCreationOptions? TaskCreationOptions { get; set; }
public TaskCreationOptions TaskCreationOptions { get; set; } = TaskCreationOptions.None;
protected GH_AsyncComponent(string name, string nickname, string description, string category, string subCategory)
: base(name, nickname, description, category, subCategory)
{
Workers = new List<WorkerInstance>();
CancellationSources = new List<CancellationTokenSource>();
_tasks = new List<Task>();
_workers = new List<Worker<T>>();
ProgressReports = new ConcurrentDictionary<string, double>();
_displayProgressTimer = new Timer(333) { AutoReset = false };
@@ -59,16 +84,17 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
_displayProgressTimer.Start();
}
};
}
_done = () =>
private void Done()
{
Interlocked.Increment(ref _state);
if (_state == Workers.Count && _setData == 0)
if (_state == _workers.Count && _setData == 0)
{
Interlocked.Exchange(ref _setData, 1);
// We need to reverse the workers list to set the outputs in the same order as the inputs.
Workers.Reverse();
_workers.Reverse();
Rhino.RhinoApp.InvokeOnUiThread(
(Action)
@@ -78,17 +104,16 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
}
);
}
};
}
public virtual void DisplayProgress(object sender, System.Timers.ElapsedEventArgs e)
{
if (Workers.Count == 0 || ProgressReports.Values.Count == 0)
if (_workers.Count == 0 || ProgressReports.Values.Count == 0)
{
return;
}
if (Workers.Count == 1)
if (_workers.Count == 1)
{
Message = ProgressReports.Values.Last().ToString("0.00%");
}
@@ -100,7 +125,7 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
total += kvp.Value;
}
Message = (total / Workers.Count).ToString("0.00%");
Message = (total / _workers.Count).ToString("0.00%");
}
Rhino.RhinoApp.InvokeOnUiThread(
@@ -121,29 +146,24 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
Debug.WriteLine("Killing");
foreach (var source in CancellationSources)
foreach (var currentWorker in _workers)
{
source.Cancel();
currentWorker.Cancel();
}
CancellationSources.Clear();
Workers.Clear();
ProgressReports.Clear();
_tasks.Clear();
Interlocked.Exchange(ref _state, 0);
ResetState();
}
protected override void AfterSolveInstance()
{
System.Diagnostics.Debug.WriteLine("After solve instance was called " + _state + " ? " + Workers.Count);
Debug.WriteLine("After solve instance was called " + _state + " ? " + _workers.Count);
// We need to start all the tasks as close as possible to each other.
if (_state == 0 && _tasks.Count > 0 && _setData == 0)
if (_state == 0 && _workers.Count > 0 && _setData == 0)
{
System.Diagnostics.Debug.WriteLine("After solve INVOKATION");
foreach (var task in _tasks)
Debug.WriteLine("After solve INVOCATION");
foreach (var worker in _workers)
{
task.Start();
worker.Task.Start();
}
}
}
@@ -170,31 +190,30 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
// Add cancellation source to our bag
var tokenSource = new CancellationTokenSource();
CancellationSources.Add(tokenSource);
var currentWorker = BaseWorker.Duplicate($"Worker-{da.Iteration}", tokenSource.Token);
if (currentWorker == null)
{
AddRuntimeMessage(GH_RuntimeMessageLevel.Error, "Could not get a worker instance.");
return;
}
// Let the worker collect data.
currentWorker.GetData(da, Params);
var currentRun =
TaskCreationOptions != null
? new Task(
() => currentWorker.DoWork(_reportProgress, _done),
var currentRun = new Task<Task>(
async () =>
{
await currentWorker.DoWork(_reportProgress, Done).ConfigureAwait(true);
},
tokenSource.Token,
(TaskCreationOptions)TaskCreationOptions
)
: new Task(() => currentWorker.DoWork(_reportProgress, _done), tokenSource.Token);
TaskCreationOptions
);
// Add the worker to our list
Workers.Add(currentWorker);
_tasks.Add(currentRun);
_workers.Add(
new()
{
Instance = currentWorker,
Task = currentRun,
CancellationSource = tokenSource,
}
);
return;
}
@@ -204,10 +223,10 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
return;
}
if (Workers.Count > 0)
if (_workers.Count > 0)
{
Interlocked.Decrement(ref _state);
Workers[_state].SetData(da);
_workers[_state].Instance.SetData(da);
}
if (_state != 0)
@@ -215,31 +234,34 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
return;
}
CancellationSources.Clear();
Workers.Clear();
ProgressReports.Clear();
_tasks.Clear();
foreach (var worker in _workers)
{
worker?.Dispose();
}
Interlocked.Exchange(ref _setData, 0);
ResetState();
Message = "Done";
OnDisplayExpired(true);
}
public void RequestCancellation()
private void ResetState()
{
foreach (var source in CancellationSources)
{
source.Cancel();
}
CancellationSources.Clear();
Workers.Clear();
_workers.Clear();
ProgressReports.Clear();
_tasks.Clear();
Interlocked.Exchange(ref _state, 0);
Interlocked.Exchange(ref _setData, 0);
}
public void RequestCancellation()
{
foreach (var worker in _workers)
{
worker.Cancel();
}
ResetState();
Message = "Cancelled";
OnDisplayExpired(true);
}
@@ -260,6 +282,10 @@ public abstract class GH_AsyncComponent : GH_Component, IDisposable
if (disposing)
{
foreach (var worker in _workers)
{
worker?.Dispose();
}
_displayProgressTimer?.Dispose();
}
}
@@ -21,6 +21,7 @@
<ItemGroup Label="Package References">
<PackageReference Include="Grasshopper" Version="7.4.21078.1001" IncludeAssets="compile;build" PrivateAssets="all" />
<PackageReference Include="PolySharp" Version="1.14.1" />
</ItemGroup>
<ItemGroup>
+15 -9
View File
@@ -1,16 +1,17 @@
using Grasshopper.Kernel;
using Grasshopper.Kernel;
namespace GrasshopperAsyncComponent;
/// <summary>
/// A class that holds the actual compute logic and encapsulates the state it needs. Every <see cref="GH_AsyncComponent"/> needs to have one.
/// A class that holds the actual compute logic and encapsulates the state it needs. Every <see cref="GH_AsyncComponent{T}"/> needs to have one.
/// </summary>
public abstract class WorkerInstance(GH_Component? parent, string id, CancellationToken cancellationToken)
public abstract class WorkerInstance<T>(T parent, string id, CancellationToken cancellationToken)
where T : GH_Component
{
/// <summary>
/// The parent component. Useful for passing state back to the host component.
/// </summary>
public GH_Component? Parent { get; set; } = parent;
public T Parent { get; set; } = parent;
public CancellationToken CancellationToken { get; } = cancellationToken;
@@ -22,24 +23,29 @@ public abstract class WorkerInstance(GH_Component? parent, string id, Cancellati
/// <param name="id">A Unique id for the new duplicate</param>
/// <param name="cancellationToken">A cancellationToken to be passed to the new duplicate</param>
/// <returns></returns>
public abstract WorkerInstance? Duplicate(string id, CancellationToken cancellationToken);
public abstract WorkerInstance<T> Duplicate(string id, CancellationToken cancellationToken);
/// <summary>
/// This method is where the actual calculation/computation/heavy lifting should be done.
/// <b>Make sure you always check as frequently as you can if <see cref="WorkerInstance.CancellationToken"/> is cancelled. For an example, see the PrimeCalculatorWorker example.</b>
/// <b>Make sure you always check as frequently as you can if <see cref="WorkerInstance{T}.CancellationToken"/> is cancelled. For an example, see the PrimeCalculatorWorker example.</b>
/// </summary>
/// <remarks>
/// If you don't need <see langword="async"/> function, then you can simply return <see cref="Task.CompletedTask"/>
/// Either way, you should be sure to handle exceptions in this function. Otherwise, they will be Unobserved!
/// You can call <paramref name="done"/> on <see cref="Exception"/>s, but avoid calling it when cancellation is has been observed.
/// </remarks>
/// <param name="reportProgress">Call this to report progress up to the parent component.</param>
/// <param name="done">Call this when everything is <b>done</b>. It will tell the parent component that you're ready to <see cref="SetData(IGH_DataAccess)"/>.</param>
public abstract void DoWork(Action<string, double> reportProgress, Action done);
public abstract Task DoWork(Action<string, double> reportProgress, Action done);
/// <summary>
/// Write your data setting logic here. <b>Do not call this function directly from this class. It will be invoked by the parent <see cref="GH_AsyncComponent"/> after you've called `Done` in the <see cref="DoWork"/> function.</b>
/// Write your data setting logic here. <b>Do not call this function directly from this class. It will be invoked by the parent <see cref="GH_AsyncComponent{T}"/> after you've called `Done` in the <see cref="DoWork"/> function.</b>
/// </summary>
/// <param name="da"></param>
public abstract void SetData(IGH_DataAccess da);
/// <summary>
/// Write your data collection logic here. <b>Do not call this method directly. It will be invoked by the parent <see cref="GH_AsyncComponent"/>.</b>
/// Write your data collection logic here. <b>Do not call this method directly. It will be invoked by the parent <see cref="GH_AsyncComponent{T}"/>.</b>
/// </summary>
/// <param name="da"></param>
/// <param name="parameters"></param>
@@ -1,10 +1,10 @@
using System.Windows.Forms;
using System.Windows.Forms;
using Grasshopper.Kernel;
using GrasshopperAsyncComponent;
namespace GrasshopperAsyncComponentDemo.SampleImplementations;
public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent
public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent<Sample_PrimeCalculatorAsyncComponent>
{
public override Guid ComponentGuid => new Guid("22C612B0-2C57-47CE-B9FE-E10621F18933");
@@ -46,25 +46,38 @@ public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent
);
}
private sealed class PrimeCalculatorWorker : WorkerInstance
private sealed class PrimeCalculatorWorker : WorkerInstance<Sample_PrimeCalculatorAsyncComponent>
{
private int TheNthPrime { get; set; } = 100;
private long ThePrime { get; set; } = -1;
public PrimeCalculatorWorker(
GH_Component? parent,
Sample_PrimeCalculatorAsyncComponent parent,
string id = "baseworker",
CancellationToken cancellationToken = default
)
: base(parent, id, cancellationToken) { }
public override void DoWork(Action<string, double> reportProgress, Action done)
public override Task DoWork(Action<string, double> reportProgress, Action done)
{
try
{
CalculatePrimes(reportProgress);
done();
}
catch (OperationCanceledException) when (CancellationToken.IsCancellationRequested)
{
//No need to call `done()` - GrasshopperAsyncComponent assumes immediate cancel,
//thus it has already performed clean-up actions that would normally be done on `done()`
}
return Task.CompletedTask;
}
public void CalculatePrimes(Action<string, double> reportProgress)
{
// 👉 Checking for cancellation!
if (CancellationToken.IsCancellationRequested)
{
return;
}
CancellationToken.ThrowIfCancellationRequested();
int count = 0;
long a = 2;
@@ -73,20 +86,14 @@ public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent
while (count < TheNthPrime)
{
// 👉 Checking for cancellation!
if (CancellationToken.IsCancellationRequested)
{
return;
}
CancellationToken.ThrowIfCancellationRequested();
long b = 2;
int prime = 1; // to check if found a prime
while (b * b <= a)
{
// 👉 Checking for cancellation!
if (CancellationToken.IsCancellationRequested)
{
return;
}
CancellationToken.ThrowIfCancellationRequested();
if (a % b == 0)
{
@@ -106,11 +113,12 @@ public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent
}
ThePrime = --a;
done();
}
public override WorkerInstance Duplicate(string id, CancellationToken cancellationToken) =>
new PrimeCalculatorWorker(Parent, id, cancellationToken);
public override WorkerInstance<Sample_PrimeCalculatorAsyncComponent> Duplicate(
string id,
CancellationToken cancellationToken
) => new PrimeCalculatorWorker(Parent, id, cancellationToken);
public override void GetData(IGH_DataAccess da, GH_ComponentParamServer parameters)
{
@@ -131,12 +139,6 @@ public class Sample_PrimeCalculatorAsyncComponent : GH_AsyncComponent
public override void SetData(IGH_DataAccess da)
{
// 👉 Checking for cancellation!
if (CancellationToken.IsCancellationRequested)
{
return;
}
da.SetData(0, ThePrime);
}
}
@@ -1,10 +1,10 @@
using System.Windows.Forms;
using System.Windows.Forms;
using Grasshopper.Kernel;
using GrasshopperAsyncComponent;
namespace GrasshopperAsyncComponentDemo.SampleImplementations;
public class Sample_UselessCyclesAsyncComponent : GH_AsyncComponent
public class Sample_UselessCyclesAsyncComponent : GH_AsyncComponent<Sample_UselessCyclesAsyncComponent>
{
public override Guid ComponentGuid => new Guid("DF2B93E2-052D-4BE4-BC62-90DC1F169BF6");
@@ -19,20 +19,33 @@ public class Sample_UselessCyclesAsyncComponent : GH_AsyncComponent
}
private sealed class UselessCyclesWorker(
GH_Component? parent,
Sample_UselessCyclesAsyncComponent parent,
string id = "baseworker",
CancellationToken cancellationToken = default
) : WorkerInstance(parent, id, cancellationToken)
) : WorkerInstance<Sample_UselessCyclesAsyncComponent>(parent, id, cancellationToken)
{
private int MaxIterations { get; set; } = 100;
public override void DoWork(Action<string, double> reportProgress, Action done)
public override Task DoWork(Action<string, double> reportProgress, Action done)
{
try
{
RunUselessCycles(reportProgress);
done();
}
catch (OperationCanceledException) when (CancellationToken.IsCancellationRequested)
{
//No need to call `done()` - GrasshopperAsyncComponent assumes immediate cancel,
//thus it has already performed clean-up actions that would normally be done on `done()`
}
return Task.CompletedTask;
}
public void RunUselessCycles(Action<string, double> reportProgress)
{
// Checking for cancellation
if (CancellationToken.IsCancellationRequested)
{
return;
}
CancellationToken.ThrowIfCancellationRequested();
for (int i = 0; i <= MaxIterations; i++)
{
@@ -45,17 +58,14 @@ public class Sample_UselessCyclesAsyncComponent : GH_AsyncComponent
reportProgress(Id, (i + 1) / (double)MaxIterations);
// Checking for cancellation
if (CancellationToken.IsCancellationRequested)
{
return;
CancellationToken.ThrowIfCancellationRequested();
}
}
done();
}
public override WorkerInstance Duplicate(string id, CancellationToken cancellationToken) =>
new UselessCyclesWorker(Parent, id, cancellationToken);
public override WorkerInstance<Sample_UselessCyclesAsyncComponent> Duplicate(
string id,
CancellationToken cancellationToken
) => new UselessCyclesWorker(Parent, id, cancellationToken);
public override void GetData(IGH_DataAccess da, GH_ComponentParamServer parameters)
{