Compare commits

...

12 Commits

Author SHA1 Message Date
Adam Hathcock 24db4c4ae4 Merge pull request #288 from specklesystems/adam/no-drop-writes
.NET Build and Publish / build (push) Has been cancelled
fix (main) Don't drop items to write when sending fast
2025-04-28 10:18:48 +01:00
Adam Hathcock edf63d4a1b fix build issue 2025-04-28 09:39:46 +01:00
Adam Hathcock b5b0922e7f Revert to write async 2025-04-28 09:35:02 +01:00
Adam Hathcock ff390f772d just wait for space instead of another task and reduce size to 1000 2025-04-25 18:24:34 +01:00
Adam Hathcock d69f0bba2a fmt 2025-04-25 18:13:05 +01:00
Adam Hathcock 33c14fc14c Remove extras 2025-04-25 18:09:04 +01:00
Adam Hathcock 536e58aacc Don't drop items to write when sending fast 2025-04-25 17:45:01 +01:00
Adam Hathcock 377829adae fix(main) exception test correction and token usage (#283)
.NET Build and Publish / build (push) Has been cancelled
* add parallelism on exception after count test

* use scoped token source correctly

* format

* Centralized token usage and made sqlite busy timeout be 5 seconds

* restore write parallelism to 4

* add to comment
2025-04-24 10:40:46 +00:00
Adam Hathcock cc9639b179 Merge pull request #282 from specklesystems/adam/error-fix
fix(main) Wrong error message being displayed in UI
2025-04-24 11:29:10 +01:00
Adam Hathcock b733ce5f29 fix snapshot test message 2025-04-22 11:09:50 +01:00
Adam Hathcock 1c8b2b82d7 Wrong error message being displayed in UI 2025-04-22 10:13:59 +01:00
Jedd Morgan 11cd2dc1cb Update ProjectResourceExceptionalTests.cs (#279) 2025-04-11 12:27:04 +00:00
11 changed files with 101 additions and 49 deletions
@@ -8,7 +8,7 @@ namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelSaver<T> public abstract class ChannelSaver<T>
where T : IHasByteSize where T : IHasByteSize
{ {
private const int SEND_CAPACITY = 500; private const int SEND_CAPACITY = 1000;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2); private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4; private const int MAX_PARALLELISM_HTTP = 4;
@@ -28,21 +28,26 @@ public abstract class ChannelSaver<T>
_ => throw new NotImplementedException("Dropping items not supported.") _ => throw new NotImplementedException("Dropping items not supported.")
); );
public Task Start(CancellationToken cancellationToken) => public Task Start(
int? maxParallelism,
int? httpBatchSize,
int? cacheBatchSize,
CancellationToken cancellationToken
) =>
_checkCacheChannel _checkCacheChannel
.Reader.BatchByByteSize(HTTP_SEND_CHUNK_SIZE) .Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT) .WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync( .PipeAsync(
MAX_PARALLELISM_HTTP, maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await SendToServer(x).ConfigureAwait(false), async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY, HTTP_CAPACITY,
false, false,
cancellationToken cancellationToken
) )
.Join() .Join()
.Batch(MAX_CACHE_BATCH) .Batch(cacheBatchSize ?? MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT) .WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken) .ReadAllConcurrently(maxParallelism ?? MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith( .ContinueWith(
t => t =>
{ {
@@ -63,14 +68,15 @@ public abstract class ChannelSaver<T>
TaskScheduler.Current TaskScheduler.Current
); );
public void Save(T item, CancellationToken cancellationToken) public async Task SaveAsync(T item, CancellationToken cancellationToken)
{ {
if (Exception is not null || cancellationToken.IsCancellationRequested) if (Exception is not null)
{ {
return; //don't save if we're already done through an error return; //don't save if we're already done through an error
} }
// ReSharper disable once MethodSupportsCancellation //can switch to check then try pattern when back pressure is needed or exceptions are too much
_checkCacheChannel.Writer.TryWrite(item); //the trees don't need to respond to back pressure
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
} }
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch) private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
@@ -72,9 +72,14 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
cmd4.ExecuteNonQuery(); cmd4.ExecuteNonQuery();
} }
using (SqliteCommand cmd0 = new("PRAGMA journal_mode='wal';", c)) using (SqliteCommand cmd5 = new("PRAGMA journal_mode='wal';", c))
{ {
cmd0.ExecuteNonQuery(); cmd5.ExecuteNonQuery();
}
//do this to wait 5 seconds to avoid db lock exceptions, this is 0 by default
using (SqliteCommand cmd6 = new("PRAGMA busy_timeout=5000;", c))
{
cmd6.ExecuteNonQuery();
} }
}); });
} }
@@ -146,7 +146,7 @@ public sealed class ObjectLoader(
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
if (Exception is not null) if (Exception is not null)
{ {
throw new SpeckleException("Error while sending", Exception); throw new SpeckleException("Error while loading", Exception);
} }
} }
} }
@@ -9,10 +9,10 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public interface IObjectSaver : IDisposable public interface IObjectSaver : IDisposable
{ {
Exception? Exception { get; set; } Exception? Exception { get; set; }
Task Start(CancellationToken cancellationToken); Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
void DoneTraversing(); void DoneTraversing();
Task DoneSaving(); Task DoneSaving();
void SaveItem(BaseItem item, CancellationToken cancellationToken); Task SaveAsync(BaseItem item);
} }
public sealed class ObjectSaver( public sealed class ObjectSaver(
@@ -77,10 +77,10 @@ public sealed class ObjectSaver(
} }
} }
public void SaveItem(BaseItem item, CancellationToken cancellationToken) public async Task SaveAsync(BaseItem item)
{ {
Interlocked.Increment(ref _objectsSerialized); Interlocked.Increment(ref _objectsSerialized);
Save(item, cancellationToken); await SaveAsync(item, _cancellationTokenSource.Token).ConfigureAwait(false);
} }
public override void SaveToCache(List<BaseItem> batch) public override void SaveToCache(List<BaseItem> batch)
@@ -14,7 +14,12 @@ public record SerializeProcessOptions(
bool SkipCacheWrite = false, bool SkipCacheWrite = false,
bool SkipServer = false, bool SkipServer = false,
bool SkipFindTotalObjects = false bool SkipFindTotalObjects = false
); )
{
public int? MaxHttpSendSize { get; set; }
public int? MaxCacheSize { get; set; }
public int? MaxParallelism { get; set; }
}
public readonly record struct SerializeProcessResults( public readonly record struct SerializeProcessResults(
string RootId, string RootId,
@@ -99,7 +104,12 @@ public sealed class SerializeProcess(
{ {
try try
{ {
var channelTask = objectSaver.Start(_processSource.Token); var channelTask = objectSaver.Start(
options?.MaxParallelism,
options?.MaxHttpSendSize,
options?.MaxCacheSize,
_processSource.Token
);
var findTotalObjectsTask = Task.CompletedTask; var findTotalObjectsTask = Task.CompletedTask;
if (!_options.SkipFindTotalObjects) if (!_options.SkipFindTotalObjects)
{ {
@@ -112,7 +122,7 @@ public sealed class SerializeProcess(
); );
} }
await Traverse(root, _processSource.Token).ConfigureAwait(false); await Traverse(root).ConfigureAwait(false);
ThrowIfFailed(); ThrowIfFailed();
objectSaver.DoneTraversing(); objectSaver.DoneTraversing();
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false); await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
@@ -123,7 +133,7 @@ public sealed class SerializeProcess(
ThrowIfFailed(); ThrowIfFailed();
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze()); return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
} }
catch (TaskCanceledException) catch (OperationCanceledException)
{ {
ThrowIfFailed(); ThrowIfFailed();
throw; throw;
@@ -138,15 +148,15 @@ public sealed class SerializeProcess(
} }
foreach (var child in baseChildFinder.GetChildren(obj)) foreach (var child in baseChildFinder.GetChildren(obj))
{ {
_objectsFound++; Interlocked.Increment(ref _objectsFound);
progress?.Report(new(ProgressEvent.FindingChildren, _objectsFound, null)); progress?.Report(new(ProgressEvent.FindingChildren, _objectsFound, null));
TraverseTotal(child); TraverseTotal(child);
} }
} }
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, CancellationToken token) private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
{ {
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
@@ -154,12 +164,11 @@ public sealed class SerializeProcess(
try try
{ {
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>(); var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
using var childCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
foreach (var child in baseChildFinder.GetChildren(obj)) foreach (var child in baseChildFinder.GetChildren(obj))
{ {
// tmp is necessary because of the way closures close over loop variables // tmp is necessary because of the way closures close over loop variables
var tmp = child; var tmp = child;
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
@@ -167,8 +176,8 @@ public sealed class SerializeProcess(
.Factory.StartNew( .Factory.StartNew(
// ReSharper disable once AccessToDisposedClosure // ReSharper disable once AccessToDisposedClosure
// don't need to capture here // don't need to capture here
async () => await Traverse(tmp, childCancellationTokenSource.Token).ConfigureAwait(false), async () => await Traverse(tmp).ConfigureAwait(false),
childCancellationTokenSource.Token, _processSource.Token,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness, TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_belowNormal _belowNormal
) )
@@ -176,7 +185,7 @@ public sealed class SerializeProcess(
tasks.Add(t); tasks.Add(t);
} }
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
@@ -206,7 +215,7 @@ public sealed class SerializeProcess(
} while (currentTasks.Count > 0); } while (currentTasks.Count > 0);
} }
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
@@ -222,13 +231,14 @@ public sealed class SerializeProcess(
_currentClosurePool.Return(childClosure); _currentClosurePool.Return(childClosure);
} }
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, token); var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, _processSource.Token);
if (token.IsCancellationRequested)
if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
@@ -242,13 +252,13 @@ public sealed class SerializeProcess(
{ {
if (item.NeedsStorage) if (item.NeedsStorage)
{ {
if (token.IsCancellationRequested) if (_processSource.Token.IsCancellationRequested)
{ {
return EMPTY_CLOSURES; return EMPTY_CLOSURES;
} }
Interlocked.Increment(ref _objectsSerialized); Interlocked.Increment(ref _objectsSerialized);
objectSaver.SaveItem(item, childCancellationTokenSource.Token); await objectSaver.SaveAsync(item).ConfigureAwait(false);
} }
if (!currentClosures.ContainsKey(item.Id)) if (!currentClosures.ContainsKey(item.Id))
@@ -1,10 +1,10 @@
{ {
"Data": {}, "Data": {},
"InnerException": { "InnerException": {
"$type": "NotImplementedException", "$type": "Exception",
"Data": {}, "Data": {},
"Message": "The method or operation is not implemented.", "Message": "Count exceeded",
"Type": "NotImplementedException" "Type": "Exception"
}, },
"Message": "Error while sending", "Message": "Error while sending",
"Type": "SpeckleException" "Type": "SpeckleException"
@@ -6,6 +6,6 @@
"Message": "The method or operation is not implemented.", "Message": "The method or operation is not implemented.",
"Type": "NotImplementedException" "Type": "NotImplementedException"
}, },
"Message": "Error while sending", "Message": "Error while loading",
"Type": "SpeckleException" "Type": "SpeckleException"
} }
@@ -66,7 +66,27 @@ public class ExceptionTests
[Fact] [Fact]
public async Task Test_Exceptions_Cache_ExceptionsAfter_10() public async Task Test_Exceptions_Cache_ExceptionsAfter_10()
{ {
var testClass = new TestClass() { RegularProperty = "Hello" }; var @base = new SampleObjectBase2();
@base["dynamicProp"] = 123;
@base.applicationId = "1";
@base.detachedProp = new SamplePropBase2()
{
name = "detachedProp",
applicationId = "2",
line = new Polyline() { units = "test", value = [1.0, 2.0] },
};
@base.detachedProp2 = new SamplePropBase2()
{
name = "detachedProp2",
applicationId = "3",
line = new Polyline() { units = "test", value = [3.0, 2.0] },
};
@base.attachedProp = new SamplePropBase2()
{
name = "attachedProp",
applicationId = "4",
line = new Polyline() { units = "test", value = [3.0, 4.0] },
};
await using var serializeProcess = _factory.CreateSerializeProcess( await using var serializeProcess = _factory.CreateSerializeProcess(
new ExceptionSendCacheManager(exceptionsAfter: 10), new ExceptionSendCacheManager(exceptionsAfter: 10),
@@ -74,9 +94,14 @@ public class ExceptionTests
null, null,
default, default,
new SerializeProcessOptions(false, false, false, true) new SerializeProcessOptions(false, false, false, true)
{
MaxHttpSendSize = 1,
MaxCacheSize = 1,
MaxParallelism = 1,
}
); );
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(testClass)); var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(@base));
await Verify(ex); await Verify(ex);
} }
@@ -4,6 +4,7 @@ namespace Speckle.Sdk.Serialization.Tests.Framework;
public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
{ {
private readonly object _lock = new();
private int _count; private int _count;
public void Dispose() { } public void Dispose() { }
@@ -24,18 +25,23 @@ public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAf
private void CheckExceptions() private void CheckExceptions()
{ {
if (exceptionsAfter is not null) lock (_lock)
{ {
if (exceptionsAfter.Value > _count) if (exceptionsAfter is not null)
{ {
_count++; if (exceptionsAfter.Value > _count)
{
_count++;
}
else
{
throw new Exception("Count exceeded");
}
} }
else else
{ {
throw new Exception("Count exceeded"); throw new NotImplementedException();
} }
} }
throw new NotImplementedException();
} }
} }
@@ -100,7 +100,7 @@ public class ModelResourceExceptionalTests : IAsyncLifetime
.Invoking(async () => await Sut.Update(input)) .Invoking(async () => await Sut.Update(input))
.Should() .Should()
.ThrowAsync<AggregateException>(); .ThrowAsync<AggregateException>();
ex.WithInnerExceptionExactly<SpeckleGraphQLForbiddenException>(); ex.WithInnerExceptionExactly<SpeckleGraphQLStreamNotFoundException>();
} }
[Fact] [Fact]
@@ -73,7 +73,7 @@ public class ProjectResourceExceptionalTests : IAsyncLifetime
var ex = await Assert.ThrowsAsync<AggregateException>( var ex = await Assert.ThrowsAsync<AggregateException>(
async () => _ = await Sut.Update(new("NonExistentProject", "My new name")) async () => _ = await Sut.Update(new("NonExistentProject", "My new name"))
); );
ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLForbiddenException>(); ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLStreamNotFoundException>();
} }
[Fact] [Fact]