fix(threading): fixes race condition in settings vars on worker done() calls

changes SetData to int value (1=setting, 0=not setting), and moves all ops to interlocked calls on
ints - fingers crossed
This commit is contained in:
Dimitrie Stefanescu
2021-02-03 19:49:21 +00:00
parent c4dd423018
commit 92f814dab4
2 changed files with 65 additions and 33 deletions
+60 -30
View File
@@ -1,10 +1,11 @@
using System;
using Grasshopper.Kernel;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grasshopper.Kernel;
using Timer = System.Timers.Timer;
namespace GrasshopperAsyncComponent
@@ -21,14 +22,14 @@ namespace GrasshopperAsyncComponent
Action<string, double> ReportProgress;
public ConcurrentDictionary<string, double> ProgressReports;
Action Done;
Timer DisplayProgressTimer;
int State = 0;
bool SetData = false;
int SetData = 0;
public List<WorkerInstance> Workers;
@@ -55,16 +56,19 @@ namespace GrasshopperAsyncComponent
ReportProgress = (id, value) =>
{
ProgressReports[id] = value;
if (!DisplayProgressTimer.Enabled) DisplayProgressTimer.Start();
if (!DisplayProgressTimer.Enabled)
{
DisplayProgressTimer.Start();
}
};
Done = () =>
{
State++;
if (State == Workers.Count)
Interlocked.Increment(ref State);
if (State == Workers.Count && SetData == 0)
{
SetData = true;
Interlocked.Exchange(ref SetData, 1);
// We need to reverse the workers list to set the outputs in the same order as the inputs.
Workers.Reverse();
@@ -84,7 +88,11 @@ namespace GrasshopperAsyncComponent
public virtual void DisplayProgress(object sender, System.Timers.ElapsedEventArgs e)
{
if (Workers.Count == 0) return;
if (Workers.Count == 0)
{
return;
}
if (Workers.Count == 1)
{
Message = ProgressReports.Values.Last().ToString("0.00%");
@@ -108,39 +116,52 @@ namespace GrasshopperAsyncComponent
protected override void BeforeSolveInstance()
{
if (State != 0 && SetData) return;
if (State != 0 && SetData == 1)
{
return;
}
foreach (var source in CancellationSources) source.Cancel();
Debug.WriteLine("Killing");
foreach (var source in CancellationSources)
{
source.Cancel();
}
CancellationSources.Clear();
Workers.Clear();
ProgressReports.Clear();
Tasks.Clear();
State = 0;
//var test = Params.Output[0].VolatileData;
Interlocked.Exchange(ref State, 0);
}
protected override void AfterSolveInstance()
{
System.Diagnostics.Debug.WriteLine("After solve instance was called " + State + " ? " + Workers.Count);
// We need to start all the tasks as close as possible to each other.
if (State == 0 && Tasks.Count > 0)
if (State == 0 && Tasks.Count > 0 && SetData == 0)
{
foreach (var task in Tasks) task.Start();
//var test = Params.Output[0].VolatileData;
System.Diagnostics.Debug.WriteLine("After solve INVOKATIONM");
foreach (var task in Tasks)
{
task.Start();
}
}
}
protected override void ExpireDownStreamObjects()
{
// Prevents the flash of null data until the new solution is ready
if (SetData)
if (SetData == 1)
{
base.ExpireDownStreamObjects();
}
}
protected override void SolveInstance(IGH_DataAccess DA)
{
//return;
if (State == 0)
{
if (BaseWorker == null)
@@ -164,13 +185,13 @@ namespace GrasshopperAsyncComponent
currentWorker.CancellationToken = tokenSource.Token;
currentWorker.Id = $"Worker-{DA.Iteration}";
var currentRun = TaskCreationOptions != null
? new Task(() => currentWorker.DoWork(ReportProgress, Done), tokenSource.Token, (TaskCreationOptions)TaskCreationOptions)
var currentRun = TaskCreationOptions != null
? new Task(() => currentWorker.DoWork(ReportProgress, Done), tokenSource.Token, (TaskCreationOptions)TaskCreationOptions)
: new Task(() => currentWorker.DoWork(ReportProgress, Done), tokenSource.Token);
// Add cancellation source to our bag
CancellationSources.Add(tokenSource);
// Add the worker to our list
Workers.Add(currentWorker);
@@ -179,19 +200,28 @@ namespace GrasshopperAsyncComponent
return;
}
if (!SetData) return;
if (Workers.Count > 0)
Workers[--State].SetData(DA);
if (SetData == 0)
{
return;
}
if (Workers.Count > 0)
{
Interlocked.Decrement(ref State);
Workers[State].SetData(DA);
}
if (State != 0)
{
return;
}
if (State != 0) return;
CancellationSources.Clear();
Workers.Clear();
ProgressReports.Clear();
Tasks.Clear();
SetData = false;
Interlocked.Exchange(ref SetData, 0);
Message = "Done";
OnDisplayExpired(true);
@@ -4,7 +4,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{114D5E49-AC13-47F7-A70E-B4289579F4E3}</ProjectGuid>
<ProjectGuid>{695D2B91-DDB6-416E-8A99-DDE6253DA7AA}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>GrasshopperAsyncComponent</RootNamespace>
@@ -45,10 +45,12 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grasshopper">
<Version>6.29.20238.11501</Version>
<Version>6.33.20343.16431</Version>
<IncludeAssets>compile; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="RhinoCommon">
<Version>6.29.20238.11501</Version>
<Version>6.33.20343.16431</Version>
<IncludeAssets>compile; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />