Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 377829adae | |||
| cc9639b179 | |||
| b733ce5f29 | |||
| 1c8b2b82d7 | |||
| 11cd2dc1cb |
@@ -28,21 +28,26 @@ public abstract class ChannelSaver<T>
|
||||
_ => throw new NotImplementedException("Dropping items not supported.")
|
||||
);
|
||||
|
||||
public Task Start(CancellationToken cancellationToken) =>
|
||||
public Task Start(
|
||||
int? maxParallelism,
|
||||
int? httpBatchSize,
|
||||
int? cacheBatchSize,
|
||||
CancellationToken cancellationToken
|
||||
) =>
|
||||
_checkCacheChannel
|
||||
.Reader.BatchByByteSize(HTTP_SEND_CHUNK_SIZE)
|
||||
.Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.PipeAsync(
|
||||
MAX_PARALLELISM_HTTP,
|
||||
maxParallelism ?? MAX_PARALLELISM_HTTP,
|
||||
async x => await SendToServer(x).ConfigureAwait(false),
|
||||
HTTP_CAPACITY,
|
||||
false,
|
||||
cancellationToken
|
||||
)
|
||||
.Join()
|
||||
.Batch(MAX_CACHE_BATCH)
|
||||
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ReadAllConcurrently(maxParallelism ?? MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ContinueWith(
|
||||
t =>
|
||||
{
|
||||
@@ -63,9 +68,9 @@ public abstract class ChannelSaver<T>
|
||||
TaskScheduler.Current
|
||||
);
|
||||
|
||||
public void Save(T item, CancellationToken cancellationToken)
|
||||
public void Save(T item)
|
||||
{
|
||||
if (Exception is not null || cancellationToken.IsCancellationRequested)
|
||||
if (Exception is not null)
|
||||
{
|
||||
return; //don't save if we're already done through an error
|
||||
}
|
||||
|
||||
@@ -72,9 +72,14 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
|
||||
cmd4.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
using (SqliteCommand cmd0 = new("PRAGMA journal_mode='wal';", c))
|
||||
using (SqliteCommand cmd5 = new("PRAGMA journal_mode='wal';", c))
|
||||
{
|
||||
cmd0.ExecuteNonQuery();
|
||||
cmd5.ExecuteNonQuery();
|
||||
}
|
||||
//do this to wait 5 seconds to avoid db lock exceptions, this is 0 by default
|
||||
using (SqliteCommand cmd6 = new("PRAGMA busy_timeout=5000;", c))
|
||||
{
|
||||
cmd6.ExecuteNonQuery();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ public sealed class ObjectLoader(
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
if (Exception is not null)
|
||||
{
|
||||
throw new SpeckleException("Error while sending", Exception);
|
||||
throw new SpeckleException("Error while loading", Exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,10 +9,12 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
|
||||
public interface IObjectSaver : IDisposable
|
||||
{
|
||||
Exception? Exception { get; set; }
|
||||
Task Start(CancellationToken cancellationToken);
|
||||
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
|
||||
void DoneTraversing();
|
||||
Task DoneSaving();
|
||||
void SaveItem(BaseItem item, CancellationToken cancellationToken);
|
||||
void SaveItem(BaseItem item);
|
||||
long Uploaded { get; }
|
||||
long Cached { get; }
|
||||
}
|
||||
|
||||
public sealed class ObjectSaver(
|
||||
@@ -38,6 +40,8 @@ public sealed class ObjectSaver(
|
||||
private long _cached;
|
||||
|
||||
private long _objectsSerialized;
|
||||
public long Cached => _cached;
|
||||
public long Uploaded => _uploaded;
|
||||
|
||||
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
|
||||
{
|
||||
@@ -77,10 +81,10 @@ public sealed class ObjectSaver(
|
||||
}
|
||||
}
|
||||
|
||||
public void SaveItem(BaseItem item, CancellationToken cancellationToken)
|
||||
public void SaveItem(BaseItem item)
|
||||
{
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
Save(item, cancellationToken);
|
||||
Save(item);
|
||||
}
|
||||
|
||||
public override void SaveToCache(List<BaseItem> batch)
|
||||
|
||||
@@ -14,7 +14,12 @@ public record SerializeProcessOptions(
|
||||
bool SkipCacheWrite = false,
|
||||
bool SkipServer = false,
|
||||
bool SkipFindTotalObjects = false
|
||||
);
|
||||
)
|
||||
{
|
||||
public int? MaxHttpSendSize { get; set; }
|
||||
public int? MaxCacheSize { get; set; }
|
||||
public int? MaxParallelism { get; set; }
|
||||
}
|
||||
|
||||
public readonly record struct SerializeProcessResults(
|
||||
string RootId,
|
||||
@@ -99,7 +104,12 @@ public sealed class SerializeProcess(
|
||||
{
|
||||
try
|
||||
{
|
||||
var channelTask = objectSaver.Start(_processSource.Token);
|
||||
var channelTask = objectSaver.Start(
|
||||
options?.MaxParallelism,
|
||||
options?.MaxHttpSendSize,
|
||||
options?.MaxCacheSize,
|
||||
_processSource.Token
|
||||
);
|
||||
var findTotalObjectsTask = Task.CompletedTask;
|
||||
if (!_options.SkipFindTotalObjects)
|
||||
{
|
||||
@@ -112,7 +122,7 @@ public sealed class SerializeProcess(
|
||||
);
|
||||
}
|
||||
|
||||
await Traverse(root, _processSource.Token).ConfigureAwait(false);
|
||||
await Traverse(root).ConfigureAwait(false);
|
||||
ThrowIfFailed();
|
||||
objectSaver.DoneTraversing();
|
||||
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
|
||||
@@ -123,7 +133,7 @@ public sealed class SerializeProcess(
|
||||
ThrowIfFailed();
|
||||
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
ThrowIfFailed();
|
||||
throw;
|
||||
@@ -138,15 +148,15 @@ public sealed class SerializeProcess(
|
||||
}
|
||||
foreach (var child in baseChildFinder.GetChildren(obj))
|
||||
{
|
||||
_objectsFound++;
|
||||
Interlocked.Increment(ref _objectsFound);
|
||||
progress?.Report(new(ProgressEvent.FindingChildren, _objectsFound, null));
|
||||
TraverseTotal(child);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, CancellationToken token)
|
||||
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
@@ -154,12 +164,11 @@ public sealed class SerializeProcess(
|
||||
try
|
||||
{
|
||||
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
|
||||
using var childCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
foreach (var child in baseChildFinder.GetChildren(obj))
|
||||
{
|
||||
// tmp is necessary because of the way closures close over loop variables
|
||||
var tmp = child;
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
@@ -167,8 +176,8 @@ public sealed class SerializeProcess(
|
||||
.Factory.StartNew(
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
// don't need to capture here
|
||||
async () => await Traverse(tmp, childCancellationTokenSource.Token).ConfigureAwait(false),
|
||||
childCancellationTokenSource.Token,
|
||||
async () => await Traverse(tmp).ConfigureAwait(false),
|
||||
_processSource.Token,
|
||||
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
|
||||
_belowNormal
|
||||
)
|
||||
@@ -176,7 +185,7 @@ public sealed class SerializeProcess(
|
||||
tasks.Add(t);
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
@@ -206,7 +215,7 @@ public sealed class SerializeProcess(
|
||||
} while (currentTasks.Count > 0);
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
@@ -222,13 +231,14 @@ public sealed class SerializeProcess(
|
||||
_currentClosurePool.Return(childClosure);
|
||||
}
|
||||
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, token);
|
||||
if (token.IsCancellationRequested)
|
||||
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, _processSource.Token);
|
||||
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
@@ -242,13 +252,13 @@ public sealed class SerializeProcess(
|
||||
{
|
||||
if (item.NeedsStorage)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
if (_processSource.Token.IsCancellationRequested)
|
||||
{
|
||||
return EMPTY_CLOSURES;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _objectsSerialized);
|
||||
objectSaver.SaveItem(item, childCancellationTokenSource.Token);
|
||||
objectSaver.SaveItem(item);
|
||||
}
|
||||
|
||||
if (!currentClosures.ContainsKey(item.Id))
|
||||
|
||||
+3
-3
@@ -1,10 +1,10 @@
|
||||
{
|
||||
"Data": {},
|
||||
"InnerException": {
|
||||
"$type": "NotImplementedException",
|
||||
"$type": "Exception",
|
||||
"Data": {},
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
"Message": "Count exceeded",
|
||||
"Type": "Exception"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Type": "SpeckleException"
|
||||
|
||||
+1
-1
@@ -6,6 +6,6 @@
|
||||
"Message": "The method or operation is not implemented.",
|
||||
"Type": "NotImplementedException"
|
||||
},
|
||||
"Message": "Error while sending",
|
||||
"Message": "Error while loading",
|
||||
"Type": "SpeckleException"
|
||||
}
|
||||
|
||||
@@ -66,7 +66,27 @@ public class ExceptionTests
|
||||
[Fact]
|
||||
public async Task Test_Exceptions_Cache_ExceptionsAfter_10()
|
||||
{
|
||||
var testClass = new TestClass() { RegularProperty = "Hello" };
|
||||
var @base = new SampleObjectBase2();
|
||||
@base["dynamicProp"] = 123;
|
||||
@base.applicationId = "1";
|
||||
@base.detachedProp = new SamplePropBase2()
|
||||
{
|
||||
name = "detachedProp",
|
||||
applicationId = "2",
|
||||
line = new Polyline() { units = "test", value = [1.0, 2.0] },
|
||||
};
|
||||
@base.detachedProp2 = new SamplePropBase2()
|
||||
{
|
||||
name = "detachedProp2",
|
||||
applicationId = "3",
|
||||
line = new Polyline() { units = "test", value = [3.0, 2.0] },
|
||||
};
|
||||
@base.attachedProp = new SamplePropBase2()
|
||||
{
|
||||
name = "attachedProp",
|
||||
applicationId = "4",
|
||||
line = new Polyline() { units = "test", value = [3.0, 4.0] },
|
||||
};
|
||||
|
||||
await using var serializeProcess = _factory.CreateSerializeProcess(
|
||||
new ExceptionSendCacheManager(exceptionsAfter: 10),
|
||||
@@ -74,9 +94,14 @@ public class ExceptionTests
|
||||
null,
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, false, true)
|
||||
{
|
||||
MaxHttpSendSize = 1,
|
||||
MaxCacheSize = 1,
|
||||
MaxParallelism = 1,
|
||||
}
|
||||
);
|
||||
|
||||
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(testClass));
|
||||
var ex = await Assert.ThrowsAsync<SpeckleException>(async () => await serializeProcess.Serialize(@base));
|
||||
await Verify(ex);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ namespace Speckle.Sdk.Serialization.Tests.Framework;
|
||||
|
||||
public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
|
||||
{
|
||||
private readonly object _lock = new();
|
||||
private int _count;
|
||||
|
||||
public void Dispose() { }
|
||||
@@ -24,18 +25,23 @@ public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAf
|
||||
|
||||
private void CheckExceptions()
|
||||
{
|
||||
if (exceptionsAfter is not null)
|
||||
lock (_lock)
|
||||
{
|
||||
if (exceptionsAfter.Value > _count)
|
||||
if (exceptionsAfter is not null)
|
||||
{
|
||||
_count++;
|
||||
if (exceptionsAfter.Value > _count)
|
||||
{
|
||||
_count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception("Count exceeded");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception("Count exceeded");
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
+1
-1
@@ -100,7 +100,7 @@ public class ModelResourceExceptionalTests : IAsyncLifetime
|
||||
.Invoking(async () => await Sut.Update(input))
|
||||
.Should()
|
||||
.ThrowAsync<AggregateException>();
|
||||
ex.WithInnerExceptionExactly<SpeckleGraphQLForbiddenException>();
|
||||
ex.WithInnerExceptionExactly<SpeckleGraphQLStreamNotFoundException>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
||||
+1
-1
@@ -73,7 +73,7 @@ public class ProjectResourceExceptionalTests : IAsyncLifetime
|
||||
var ex = await Assert.ThrowsAsync<AggregateException>(
|
||||
async () => _ = await Sut.Update(new("NonExistentProject", "My new name"))
|
||||
);
|
||||
ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLForbiddenException>();
|
||||
ex.InnerExceptions.Single().Should().BeOfType<SpeckleGraphQLStreamNotFoundException>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
||||
Reference in New Issue
Block a user