Compare commits

..

5 Commits

Author SHA1 Message Date
Adam Hathcock 377829adae fix(main) exception test correction and token usage (#283)
.NET Build and Publish / build (push) Has been cancelled
* add parallelism on exception after count test

* use scoped token source correctly

* format

* Centralized token usage and made sqlite busy timeout be 5 seconds

* restore write parallelism to 4

* add to comment
2025-04-24 10:40:46 +00:00
Adam Hathcock cc9639b179 Merge pull request #282 from specklesystems/adam/error-fix
fix(main) Wrong error message being displayed in UI
2025-04-24 11:29:10 +01:00
Adam Hathcock b733ce5f29 fix snapshot test message 2025-04-22 11:09:50 +01:00
Adam Hathcock 1c8b2b82d7 Wrong error message being displayed in UI 2025-04-22 10:13:59 +01:00
Jedd Morgan 11cd2dc1cb Update ProjectResourceExceptionalTests.cs (#279) 2025-04-11 12:27:04 +00:00
11 changed files with 101 additions and 46 deletions
@@ -28,21 +28,26 @@ public abstract class ChannelSaver<T>
_ => throw new NotImplementedException("Dropping items not supported.")
);
public Task Start(CancellationToken cancellationToken) =>
public Task Start(
int? maxParallelism,
int? httpBatchSize,
int? cacheBatchSize,
CancellationToken cancellationToken
) =>
_checkCacheChannel
.Reader.BatchByByteSize(HTTP_SEND_CHUNK_SIZE)
.Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
.ReadAllConcurrently(maxParallelism ?? MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith(
t =>
{
@@ -63,9 +68,9 @@ public abstract class ChannelSaver<T>
TaskScheduler.Current
);
public void Save(T item, CancellationToken cancellationToken)
public void Save(T item)
{
if (Exception is not null || cancellationToken.IsCancellationRequested)
if (Exception is not null)
{
return; //don't save if we're already done through an error
}
@@ -72,9 +72,14 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
cmd4.ExecuteNonQuery();
}
using (SqliteCommand cmd0 = new("PRAGMA journal_mode='wal';", c))
using (SqliteCommand cmd5 = new("PRAGMA journal_mode='wal';", c))
{
cmd0.ExecuteNonQuery();
cmd5.ExecuteNonQuery();
}
//do this to wait 5 seconds to avoid db lock exceptions, this is 0 by default
using (SqliteCommand cmd6 = new("PRAGMA busy_timeout=5000;", c))
{
cmd6.ExecuteNonQuery();
}
});
}
@@ -146,7 +146,7 @@ public sealed class ObjectLoader(
cancellationToken.ThrowIfCancellationRequested();
if (Exception is not null)
{
throw new SpeckleException("Error while sending", Exception);
throw new SpeckleException("Error while loading", Exception);
}
}
}
@@ -9,10 +9,12 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public interface IObjectSaver : IDisposable
{
Exception? Exception { get; set; }
Task Start(CancellationToken cancellationToken);
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
void DoneTraversing();
Task DoneSaving();
void SaveItem(BaseItem item, CancellationToken cancellationToken);
void SaveItem(BaseItem item);
long Uploaded { get; }
long Cached { get; }
}
public sealed class ObjectSaver(
@@ -38,6 +40,8 @@ public sealed class ObjectSaver(
private long _cached;
private long _objectsSerialized;
public long Cached => _cached;
public long Uploaded => _uploaded;
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
{
@@ -77,10 +81,10 @@ public sealed class ObjectSaver(
}
}
public void SaveItem(BaseItem item, CancellationToken cancellationToken)
public void SaveItem(BaseItem item)
{
Interlocked.Increment(ref _objectsSerialized);
Save(item, cancellationToken);
Save(item);
}
public override void SaveToCache(List<BaseItem> batch)
@@ -14,7 +14,12 @@ public record SerializeProcessOptions(
bool SkipCacheWrite = false,
bool SkipServer = false,
bool SkipFindTotalObjects = false
);
)
{
public int? MaxHttpSendSize { get; set; }
public int? MaxCacheSize { get; set; }
public int? MaxParallelism { get; set; }
}
public readonly record struct SerializeProcessResults(
string RootId,
@@ -99,7 +104,12 @@ public sealed class SerializeProcess(
{
try
{
var channelTask = objectSaver.Start(_processSource.Token);
var channelTask = objectSaver.Start(
options?.MaxParallelism,
options?.MaxHttpSendSize,
options?.MaxCacheSize,
_processSource.Token
);
var findTotalObjectsTask = Task.CompletedTask;
if (!_options.SkipFindTotalObjects)
{
@@ -112,7 +122,7 @@ public sealed class SerializeProcess(
);
}
await Traverse(root, _processSource.Token).ConfigureAwait(false);
await Traverse(root).ConfigureAwait(false);
ThrowIfFailed();
objectSaver.DoneTraversing();
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
@@ -123,7 +133,7 @@ public sealed class SerializeProcess(
ThrowIfFailed();
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
}
catch (TaskCanceledException)
catch (OperationCanceledException)
{
ThrowIfFailed();
throw;
@@ -138,15 +148,15 @@ public sealed class SerializeProcess(
}
foreach (var child in baseChildFinder.GetChildren(obj))
{
_objectsFound++;
Interlocked.Increment(ref _objectsFound);
progress?.Report(new(ProgressEvent.FindingChildren, _objectsFound, null));
TraverseTotal(child);
}
}
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, CancellationToken token)
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
{
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
@@ -154,12 +164,11 @@ public sealed class SerializeProcess(
try
{
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
using var childCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
foreach (var child in baseChildFinder.GetChildren(obj))
{
// tmp is necessary because of the way closures close over loop variables
var tmp = child;
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
@@ -167,8 +176,8 @@ public sealed class SerializeProcess(
.Factory.StartNew(
// ReSharper disable once AccessToDisposedClosure
// don't need to capture here
async () => await Traverse(tmp, childCancellationTokenSource.Token).ConfigureAwait(false),
childCancellationTokenSource.Token,
async () => await Traverse(tmp).ConfigureAwait(false),
_processSource.Token,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_belowNormal
)
@@ -176,7 +185,7 @@ public sealed class SerializeProcess(
tasks.Add(t);
}
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
@@ -206,7 +215,7 @@ public sealed class SerializeProcess(
} while (currentTasks.Count > 0);
}
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
@@ -222,13 +231,14 @@ public sealed class SerializeProcess(
_currentClosurePool.Return(childClosure);
}
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, token);
if (token.IsCancellationRequested)
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, _processSource.Token);
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
@@ -242,13 +252,13 @@ public sealed class SerializeProcess(
{
if (item.NeedsStorage)
{
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{
return EMPTY_CLOSURES;
}
Interlocked.Increment(ref _objectsSerialized);
objectSaver.SaveItem(item, childCancellationTokenSource.Token);
objectSaver.SaveItem(item);
}
if (!currentClosures.ContainsKey(item.Id))
@@ -1,10 +1,10 @@
{
"Data": {},
"InnerException": {
"$type": "NotImplementedException",
"$type": "Exception",
"Data": {},
"Message": "The method or operation is not implemented.",
"Type": "NotImplementedException"
"Message": "Count exceeded",
"Type": "Exception"
},
"Message": "Error while sending",
"Type": "SpeckleException"
@@ -6,6 +6,6 @@
"Message": "The method or operation is not implemented.",
"Type": "NotImplementedException"
},
"Message": "Error while sending",
"Message": "Error while loading",
"Type": "SpeckleException"
}
@@ -66,7 +66,27 @@ public class ExceptionTests
[Fact]
public async Task Test_Exceptions_Cache_ExceptionsAfter_10()
{
var testClass = new TestClass() { RegularProperty = "Hello" };
var @base = new SampleObjectBase2();
@base["dynamicProp"] = 123;
@base.applicationId = "1";
@base.detachedProp = new SamplePropBase2()
{
name = "detachedProp",
applicationId = "2",
line = new Polyline() { units = "test", value = [1.0, 2.0] },
};
@base.detachedProp2 = new SamplePropBase2()
{
name = "detachedProp2",
applicationId = "3",
line = new Polyline() { units = "test", value = [3.0, 2.0] },
};
@base.attachedProp = new SamplePropBase2()
{
name = "attachedProp",
applicationId = "4",
line = new Polyline() { units = "test", value = [3.0, 4.0] },
};
await using var serializeProcess = _factory.CreateSerializeProcess(
new ExceptionSendCacheManager(exceptionsAfter: 10),
@@ -74,9 +94,14 @@ public class ExceptionTests
null,
default,
new SerializeProcessOptions(false, false, false, true)
{
MaxHttpSendSize = 1,
MaxCacheSize = 1,
MaxParallelism = 1,
}
);
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(testClass));
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(@base));
await Verify(ex);
}
@@ -4,6 +4,7 @@ namespace Speckle.Sdk.Serialization.Tests.Framework;
public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
{
private readonly object _lock = new();
private int _count;
public void Dispose() { }
@@ -24,18 +25,23 @@ public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAf
private void CheckExceptions()
{
if (exceptionsAfter is not null)
lock (_lock)
{
if (exceptionsAfter.Value > _count)
if (exceptionsAfter is not null)
{
_count++;
if (exceptionsAfter.Value > _count)
{
_count++;
}
else
{
throw new Exception("Count exceeded");
}
}
else
{
throw new Exception("Count exceeded");
throw new NotImplementedException();
}
}
throw new NotImplementedException();
}
}
@@ -100,7 +100,7 @@ public class ModelResourceExceptionalTests : IAsyncLifetime
.Invoking(async () => await Sut.Update(input))
.Should()
.ThrowAsync<AggregateException>();
ex.WithInnerExceptionExactly<SpeckleGraphQLForbiddenException>();
ex.WithInnerExceptionExactly<SpeckleGraphQLStreamNotFoundException>();
}
[Fact]
@@ -73,7 +73,7 @@ public class ProjectResourceExceptionalTests : IAsyncLifetime
var ex = await Assert.ThrowsAsync<AggregateException>(
async () => _ = await Sut.Update(new("NonExistentProject", "My new name"))
);
ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLForbiddenException>();
ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLStreamNotFoundException>();
}
[Fact]