Add cancellation tests (#218)

* Don't log cancelling

* redo exception handling for receive

* remove null test case

* clean up with Id/Json and more cancels

* Change the exception stacks

* fix serialization test

* make a custom scrubber for internalized exceptions

* clean up

* fix namespaces again :(

* adjust the scrubber

* try to make tests more predictable

* rework exceptions again

* strip out compile files used

* formatting

* custom exception validation

* fix init

* Move serialization to own class

* save serialize test

* add deep clean

* add cancellation test on save to cache

* cancellation tests

* format

* do DI correctly

* receive cancel works
This commit is contained in:
Adam Hathcock
2025-01-30 13:42:15 +00:00
committed by GitHub
parent 73afa28026
commit 3aa993cecb
52 changed files with 1036 additions and 364 deletions
@@ -0,0 +1,32 @@
using Speckle.Sdk.Common;
namespace Speckle.Sdk.Testing.Framework;
public class AggregationExceptionScrubber : WriteOnlyJsonConverter<AggregateException>
{
private static readonly ExceptionScrubber _innerScrubber = new();
public override void Write(VerifyJsonWriter writer, AggregateException exception)
{
writer.WriteStartObject();
writer.WriteMember(exception, exception.GetType().FullName, "Type");
if (exception.InnerExceptions.Count == 1)
{
writer.WritePropertyName("InnerException");
_innerScrubber.Write(writer, exception.InnerException.NotNull());
}
else
{
writer.WritePropertyName("InnerExceptions");
writer.WriteStartArray();
foreach (var innerException in exception.InnerExceptions)
{
_innerScrubber.Write(writer, innerException);
}
writer.WriteEndArray();
}
writer.WriteEndObject();
}
}
@@ -6,7 +6,7 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Testing.Framework;
public class DummyReceiveServerObjectManager(Dictionary<string, string> objects) : IServerObjectManager
public class DummyReceiveServerObjectManager(IReadOnlyDictionary<string, string> objects) : IServerObjectManager
{
public async IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyCollection<string> objectIds,
@@ -2,7 +2,8 @@
namespace Speckle.Sdk.Testing.Framework;
public sealed class DummySqLiteReceiveManager(Dictionary<string, string> savedObjects) : ISqLiteJsonCacheManager
public sealed class DummySqLiteReceiveManager(IReadOnlyDictionary<string, string> savedObjects)
: ISqLiteJsonCacheManager
{
public void Dispose() { }
@@ -2,7 +2,7 @@
namespace Speckle.Sdk.Testing.Framework;
public sealed class DummySqLiteSendManager : ISqLiteJsonCacheManager
public class DummySqLiteSendManager : ISqLiteJsonCacheManager
{
public string? GetObject(string id) => throw new NotImplementedException();
@@ -10,7 +10,7 @@ public sealed class DummySqLiteSendManager : ISqLiteJsonCacheManager
public void UpdateObject(string id, string json) => throw new NotImplementedException();
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public virtual void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public bool HasObject(string objectId) => throw new NotImplementedException();
@@ -0,0 +1,24 @@
using Argon;
namespace Speckle.Sdk.Testing.Framework;
public class ExceptionScrubber : WriteOnlyJsonConverter<Exception>
{
public ExceptionScrubber() { }
public override void Write(VerifyJsonWriter writer, Exception value)
{
if (value.StackTrace != null)
{
var ex = new JObject
{
["Type"] = value.GetType().FullName,
["Message"] = value.Message,
["Source"] = value.Source?.Trim(),
};
writer.WriteRawValue(ex.ToString(Formatting.Indented));
return;
}
base.Write(writer, value.ToString());
}
}
+15 -3
View File
@@ -7,9 +7,17 @@ namespace Speckle.Sdk.Testing;
public static class SpeckleVerify
{
private static bool _initialized;
[ModuleInitializer]
public static void Initialize()
{
if (_initialized)
{
return;
}
_initialized = true;
VerifierSettings.DontScrubGuids();
VerifierSettings.DontScrubDateTimes();
@@ -17,10 +25,14 @@ public static class SpeckleVerify
VerifierSettings.DontIgnoreEmptyCollections();
VerifierSettings.SortPropertiesAlphabetically();
VerifierSettings.SortJsonObjects();
if (!VerifyQuibble.Initialized)
VerifierSettings.AddExtraSettings(x =>
{
VerifyQuibble.Initialize();
}
var existing = x.Converters.OfType<WriteOnlyJsonConverter<AggregateException>>().First();
x.Converters.Remove(existing);
x.Converters.Add(new AggregationExceptionScrubber());
x.Converters.Add(new ExceptionScrubber());
});
VerifyQuibble.Initialize();
}
private static readonly JsonSerializer _jsonSerializer = new()
+28
View File
@@ -14,6 +14,7 @@ const string PACK = "pack";
const string PACK_LOCAL = "pack-local";
const string CLEAN_LOCKS = "clean-locks";
const string PERF = "perf";
const string DEEP_CLEAN = "deep-clean";
Target(
CLEAN_LOCKS,
@@ -124,6 +125,33 @@ Target(
}
);
Target(
DEEP_CLEAN,
() =>
{
foreach (var f in Glob.Directories(".", "**/bin"))
{
if (f.StartsWith("build"))
{
continue;
}
Console.WriteLine("Found and will delete: " + f);
Directory.Delete(f, true);
}
foreach (var f in Glob.Directories(".", "**/obj"))
{
if (f.StartsWith("Build"))
{
continue;
}
Console.WriteLine("Found and will delete: " + f);
Directory.Delete(f, true);
}
Console.WriteLine("Running restore now.");
Run("dotnet", "restore .\\Speckle.Sdk.sln --no-cache");
}
);
static Task RunPack() => RunAsync("dotnet", "pack Speckle.Sdk.sln -c Release -o output --no-build");
Target(PACK, DependsOn(TEST), RunPack);
+3 -1
View File
@@ -6,7 +6,9 @@ public static class Collections
{
public static IReadOnlyCollection<T> Freeze<T>(this IEnumerable<T> source) => source.ToFrozenSet();
public static IReadOnlyDictionary<TKey, TValue> Freeze<TKey, TValue>(this IDictionary<TKey, TValue> source)
public static IReadOnlyDictionary<TKey, TValue> Freeze<TKey, TValue>(
this IEnumerable<KeyValuePair<TKey, TValue>> source
)
where TKey : notnull => source.ToFrozenDictionary();
}
@@ -1,30 +1,90 @@
using Open.ChannelExtensions;
using System.Threading.Channels;
using Open.ChannelExtensions;
namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelLoader<T>
{
private const int RECEIVE_CAPACITY = 5000;
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private static readonly int MAX_READ_CACHE_PARALLELISM = Environment.ProcessorCount;
private const int MAX_SAVE_CACHE_BATCH = 500;
private const int MAX_SAVE_CACHE_PARALLELISM = 4;
protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken) =>
await allChildrenIds
.ToChannel(cancellationToken: cancellationToken)
.Pipe(MAX_READ_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
private readonly List<Exception> _exceptions = new();
private readonly Channel<string> _channel = Channel.CreateBounded<string>(
new BoundedChannelOptions(RECEIVE_CAPACITY)
{
AllowSynchronousContinuations = true,
Capacity = RECEIVE_CAPACITY,
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait,
},
_ => throw new NotImplementedException("Dropping items not supported.")
);
protected async Task GetAndCache(
IEnumerable<string> allChildrenIds,
CancellationToken cancellationToken,
int? maxParallelism = null
) =>
await _channel
.Source(allChildrenIds, cancellationToken)
.Pipe(maxParallelism ?? Environment.ProcessorCount, CheckCache, cancellationToken: cancellationToken)
.Filter(x => x is not null)
.Batch(HTTP_GET_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(MAX_PARALLELISM_HTTP, async x => await Download(x).ConfigureAwait(false), -1, false, cancellationToken)
.PipeAsync(
maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await Download(x).ConfigureAwait(false),
-1,
false,
cancellationToken
)
.Join()
.Batch(MAX_SAVE_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ReadAllConcurrently(maxParallelism ?? MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith(
t =>
{
Exception? ex = t.Exception;
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
{
ex = new OperationCanceledException();
}
if (ex is not null)
{
if (ex is AggregateException ae)
{
_exceptions.AddRange(ae.Flatten().InnerExceptions);
}
else
{
_exceptions.Add(ex);
}
}
_channel.Writer.TryComplete(ex);
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Current
)
.ConfigureAwait(false);
public void CheckForExceptions()
{
if (_exceptions.Count > 0)
{
throw new AggregateException(_exceptions);
}
}
public abstract string? CheckCache(string id);
public abstract Task<List<T>> Download(List<string?> ids);
@@ -16,7 +16,7 @@ public abstract class ChannelSaver<T>
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
private const int MAX_CACHE_BATCH = 500;
private readonly List<Exception> _lists = new();
private readonly List<Exception> _exceptions = new();
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
@@ -35,7 +35,7 @@ public abstract class ChannelSaver<T>
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
@@ -55,9 +55,9 @@ public abstract class ChannelSaver<T>
if (ex is not null)
{
lock (_lists)
lock (_exceptions)
{
_lists.Add(ex);
_exceptions.Add(ex);
}
}
_checkCacheChannel.Writer.TryComplete(ex);
@@ -70,25 +70,25 @@ public abstract class ChannelSaver<T>
public async ValueTask Save(T item, CancellationToken cancellationToken) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(true);
public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch, CancellationToken cancellationToken)
public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
{
await SendToServer((Batch<T>)batch, cancellationToken).ConfigureAwait(false);
await SendToServer((Batch<T>)batch).ConfigureAwait(false);
return batch;
}
public abstract Task SendToServer(Batch<T> batch, CancellationToken cancellationToken);
public abstract Task SendToServer(Batch<T> batch);
public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();
public async Task DoneSaving()
{
await _checkCacheChannel.Reader.Completion.ConfigureAwait(true);
lock (_lists)
lock (_exceptions)
{
if (_lists.Count > 0)
if (_exceptions.Count > 0)
{
var exceptions = new List<Exception>();
foreach (var ex in _lists)
foreach (var ex in _exceptions)
{
if (ex is AggregateException ae)
{
@@ -12,9 +12,9 @@ public partial class Operations
Uri url,
string streamId,
string objectId,
string? authorizationToken = null,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
string? authorizationToken,
IProgress<ProgressArgs>? onProgressAction,
CancellationToken cancellationToken
)
{
using var receiveActivity = activityFactory.Start("Operations.Receive");
@@ -28,9 +28,10 @@ public partial class Operations
url,
streamId,
authorizationToken,
onProgressAction
onProgressAction,
cancellationToken
);
var result = await process.Deserialize(objectId, cancellationToken).ConfigureAwait(false);
var result = await process.Deserialize(objectId).ConfigureAwait(false);
receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return result;
}
@@ -132,7 +132,7 @@ public sealed class SpeckleObjectDeserializer
if (propName == "__closure")
{
reader.Read(); //goes to prop value
var closures = ClosureParser.GetClosures(reader);
var closures = ClosureParser.GetClosures(reader, CancellationToken);
if (closures.Any())
{
_total = 0;
@@ -5,13 +5,11 @@ namespace Speckle.Sdk.Serialisation.Utilities;
public static class ClosureParser
{
public static IReadOnlyList<(string, int)> GetClosures(string rootObjectJson)
public static IReadOnlyList<(string, int)> GetClosures(string json, CancellationToken cancellationToken)
{
try
{
using JsonTextReader reader = SpeckleObjectSerializerPool.Instance.GetJsonTextReader(
new StringReader(rootObjectJson)
);
using JsonTextReader reader = SpeckleObjectSerializerPool.Instance.GetJsonTextReader(new StringReader(json));
reader.Read();
while (reader.TokenType != JsonToken.EndObject)
{
@@ -19,7 +17,7 @@ public static class ClosureParser
{
case JsonToken.StartObject:
{
var closureList = ReadObject(reader);
var closureList = ReadObject(reader, cancellationToken);
return closureList;
}
default:
@@ -33,14 +31,15 @@ public static class ClosureParser
return [];
}
public static IEnumerable<string> GetChildrenIds(string rootObjectJson) =>
GetClosures(rootObjectJson).Select(x => x.Item1);
public static IEnumerable<string> GetChildrenIds(string json, CancellationToken cancellationToken) =>
GetClosures(json, cancellationToken).Select(x => x.Item1);
private static IReadOnlyList<(string, int)> ReadObject(JsonTextReader reader)
private static IReadOnlyList<(string, int)> ReadObject(JsonTextReader reader, CancellationToken cancellationToken)
{
reader.Read();
while (reader.TokenType != JsonToken.EndObject)
{
cancellationToken.ThrowIfCancellationRequested();
switch (reader.TokenType)
{
case JsonToken.PropertyName:
@@ -48,7 +47,7 @@ public static class ClosureParser
if (reader.Value as string == "__closure")
{
reader.Read(); //goes to prop vale
var closureList = ReadClosureEnumerable(reader);
var closureList = ReadClosureEnumerable(reader, cancellationToken);
return closureList;
}
reader.Read(); //goes to prop vale
@@ -66,24 +65,25 @@ public static class ClosureParser
return [];
}
public static IReadOnlyList<(string, int)> GetClosures(JsonReader reader)
public static IReadOnlyList<(string, int)> GetClosures(JsonReader reader, CancellationToken cancellationToken)
{
if (reader.TokenType != JsonToken.StartObject)
{
return Array.Empty<(string, int)>();
}
var closureList = ReadClosureEnumerable(reader);
var closureList = ReadClosureEnumerable(reader, cancellationToken);
closureList.Sort((a, b) => b.Item2.CompareTo(a.Item2));
return closureList;
}
private static List<(string, int)> ReadClosureEnumerable(JsonReader reader)
private static List<(string, int)> ReadClosureEnumerable(JsonReader reader, CancellationToken cancellationToken)
{
List<(string, int)> closureList = new();
reader.Read(); //startobject
while (reader.TokenType != JsonToken.EndObject)
{
cancellationToken.ThrowIfCancellationRequested();
var childId = (reader.Value as string).NotNull(); // propertyName
int childMinDepth = (reader.ReadAsInt32()).NotNull(); //propertyValue
reader.Read();
@@ -5,23 +5,29 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2;
public sealed class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
#pragma warning disable CA1063
public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
#pragma warning restore CA1063
{
#pragma warning disable CA1816
#pragma warning disable CA1063
public void Dispose() { }
#pragma warning restore CA1063
#pragma warning restore CA1816
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
public string? GetObject(string id) => throw new NotImplementedException();
public string? GetObject(string id) => null;
public void SaveObject(string id, string json) => throw new NotImplementedException();
public void UpdateObject(string id, string json) => throw new NotImplementedException();
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public virtual void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public bool HasObject(string objectId) => throw new NotImplementedException();
public bool HasObject(string objectId) => false;
}
public class DummySendServerObjectManager : IServerObjectManager
@@ -0,0 +1,26 @@
using System.Collections.Concurrent;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Models;
namespace Speckle.Sdk.Serialisation.V2.Receive;
[GenerateAutoInterface]
public class BaseDeserializer(IObjectDeserializerFactory objectDeserializerFactory) : IBaseDeserializer
{
public Base Deserialise(
ConcurrentDictionary<Id, Base> baseCache,
Id id,
Json json,
IReadOnlyCollection<Id> closures,
CancellationToken cancellationToken
)
{
if (baseCache.TryGetValue(id, out var baseObject))
{
return baseObject;
}
var deserializer = objectDeserializerFactory.Create(id, closures, baseCache);
return deserializer.Deserialize(json, cancellationToken);
}
}
@@ -8,9 +8,10 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2.Receive;
public record DeserializeProcessOptions(
bool SkipCache,
bool SkipCache = false,
bool ThrowOnMissingReferences = true,
bool SkipInvalidConverts = false
bool SkipInvalidConverts = false,
int? MaxParallelism = null
);
public partial interface IDeserializeProcess : IDisposable;
@@ -19,36 +20,38 @@ public partial interface IDeserializeProcess : IDisposable;
public sealed class DeserializeProcess(
IProgress<ProgressArgs>? progress,
IObjectLoader objectLoader,
IObjectDeserializerFactory objectDeserializerFactory,
IBaseDeserializer baseDeserializer,
CancellationToken cancellationToken,
DeserializeProcessOptions? options = null
) : IDeserializeProcess
{
private readonly DeserializeProcessOptions _options = options ?? new(false);
private readonly DeserializeProcessOptions _options = options ?? new();
private readonly ConcurrentDictionary<string, (string, IReadOnlyCollection<string>)> _closures = new();
private readonly ConcurrentDictionary<string, Base> _baseCache = new();
private readonly ConcurrentDictionary<string, Task> _activeTasks = new();
private readonly ConcurrentDictionary<Id, (Json, IReadOnlyCollection<Id>)> _closures = new();
private readonly ConcurrentDictionary<Id, Base> _baseCache = new();
private readonly ConcurrentDictionary<Id, Task> _activeTasks = new();
public IReadOnlyDictionary<string, Base> BaseCache => _baseCache;
public IReadOnlyDictionary<Id, Base> BaseCache => _baseCache;
public long Total { get; private set; }
[AutoInterfaceIgnore]
public void Dispose() => objectLoader.Dispose();
public async Task<Base> Deserialize(string rootId, CancellationToken cancellationToken)
public async Task<Base> Deserialize(string rootId)
{
var (rootJson, childrenIds) = await objectLoader
.GetAndCache(rootId, _options, cancellationToken)
.ConfigureAwait(false);
Total = childrenIds.Count;
Total++;
_closures.TryAdd(rootId, (rootJson, childrenIds));
var root = new Id(rootId);
_closures.TryAdd(root, (rootJson, childrenIds));
progress?.Report(new(ProgressEvent.DeserializeObject, _baseCache.Count, childrenIds.Count));
await Traverse(rootId, cancellationToken).ConfigureAwait(false);
return _baseCache[rootId];
await Traverse(root).ConfigureAwait(false);
return _baseCache[root];
}
private async Task Traverse(string id, CancellationToken cancellationToken)
private async Task Traverse(Id id)
{
if (_baseCache.ContainsKey(id))
{
@@ -71,11 +74,12 @@ public sealed class DeserializeProcess(
{
// tmp is necessary because of the way closures close over loop variables
var tmpId = childId;
cancellationToken.ThrowIfCancellationRequested();
Task t = Task
.Factory.StartNew(
() => Traverse(tmpId, cancellationToken),
() => Traverse(tmpId),
cancellationToken,
TaskCreationOptions.AttachedToParent,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
TaskScheduler.Default
)
.Unwrap();
@@ -97,16 +101,22 @@ public sealed class DeserializeProcess(
}
}
private (string, IReadOnlyCollection<string>) GetClosures(string id)
private (Json, IReadOnlyCollection<Id>) GetClosures(Id id)
{
if (!_closures.TryGetValue(id, out var closures))
{
var json = objectLoader.LoadId(id);
if (json == null)
var j = objectLoader.LoadId(id.Value);
if (j == null)
{
throw new SpeckleException($"Missing object id in SQLite cache: {id}");
}
var childrenIds = ClosureParser.GetClosures(json).OrderByDescending(x => x.Item2).Select(x => x.Item1).Freeze();
var json = new Json(j);
var childrenIds = ClosureParser
.GetClosures(json.Value, cancellationToken)
.OrderByDescending(x => x.Item2)
.Select(x => new Id(x.Item1))
.Freeze();
closures = (json, childrenIds);
_closures.TryAdd(id, closures);
}
@@ -114,28 +124,17 @@ public sealed class DeserializeProcess(
return closures;
}
public void DecodeOrEnqueueChildren(string id)
public void DecodeOrEnqueueChildren(Id id)
{
if (_baseCache.ContainsKey(id))
{
return;
}
(string json, IReadOnlyCollection<string> closures) = GetClosures(id);
var @base = Deserialise(id, json, closures);
(Json json, IReadOnlyCollection<Id> closures) = GetClosures(id);
var @base = baseDeserializer.Deserialise(_baseCache, id, json, closures, cancellationToken);
_baseCache.TryAdd(id, @base);
//remove from JSON cache because we've finally made the Base
_closures.TryRemove(id, out _);
_activeTasks.TryRemove(id, out _);
}
private Base Deserialise(string id, string json, IReadOnlyCollection<string> closures)
{
if (_baseCache.TryGetValue(id, out var baseObject))
{
return baseObject;
}
var deserializer = objectDeserializerFactory.Create(id, closures, _baseCache);
return deserializer.Deserialize(json);
}
}
@@ -9,9 +9,9 @@ namespace Speckle.Sdk.Serialisation.V2.Receive;
[GenerateAutoInterface]
public sealed class ObjectDeserializer(
string currentId,
IReadOnlyCollection<string> currentClosures,
IReadOnlyDictionary<string, Base> references,
Id currentId,
IReadOnlyCollection<Id> currentClosures,
IReadOnlyDictionary<Id, Base> references,
SpeckleObjectSerializerPool pool,
DeserializeProcessOptions? options = null
) : IObjectDeserializer
@@ -21,17 +21,13 @@ public sealed class ObjectDeserializer(
/// <exception cref="ArgumentNullException"><paramref name="objectJson"/> was null</exception>
/// <exception cref="SpeckleDeserializeException"><paramref name="objectJson"/> cannot be deserialised to type <see cref="Base"/></exception>
// /// <exception cref="TransportException"><see cref="ReadTransport"/> did not contain the required json objects (closures)</exception>
public Base Deserialize(string objectJson)
public Base Deserialize(Json objectJson, CancellationToken cancellationToken)
{
if (objectJson is null)
{
throw new ArgumentNullException(nameof(objectJson), $"Cannot deserialize {nameof(objectJson)}, value was null");
}
// Apparently this automatically parses DateTimes in strings if it matches the format:
// JObject doc1 = JObject.Parse(objectJson);
// This is equivalent code that doesn't parse datetimes:
using var stringReader = new StringReader(objectJson);
using var stringReader = new StringReader(objectJson.Value);
using JsonTextReader reader = pool.GetJsonTextReader(stringReader);
reader.DateParseHandling = DateParseHandling.None;
@@ -40,7 +36,7 @@ public sealed class ObjectDeserializer(
try
{
reader.Read();
converted = (Base)ReadObject(reader).NotNull();
converted = (Base)ReadObject(reader, cancellationToken).NotNull();
}
catch (Exception ex) when (!ex.IsFatal() && ex is not OperationCanceledException)
{
@@ -50,13 +46,13 @@ public sealed class ObjectDeserializer(
return converted;
}
private List<object?> ReadArrayAsync(JsonReader reader)
private List<object?> ReadArrayAsync(JsonReader reader, CancellationToken cancellationToken)
{
reader.Read();
List<object?> retList = new();
while (reader.TokenType != JsonToken.EndArray)
{
object? convertedValue = ReadProperty(reader);
object? convertedValue = ReadProperty(reader, cancellationToken);
if (convertedValue is DataChunk chunk)
{
retList.AddRange(chunk.data);
@@ -70,7 +66,7 @@ public sealed class ObjectDeserializer(
return retList;
}
private object? ReadObject(JsonReader reader)
private object? ReadObject(JsonReader reader, CancellationToken cancellationToken)
{
reader.Read();
Dictionary<string, object?> dict = Pools.ObjectDictionaries.Get();
@@ -82,7 +78,7 @@ public sealed class ObjectDeserializer(
{
var propName = reader.Value.NotNull().ToString().NotNull();
reader.Read(); //goes prop value
object? convertedValue = ReadProperty(reader);
object? convertedValue = ReadProperty(reader, cancellationToken);
dict[propName] = convertedValue;
reader.Read(); //goes to next
}
@@ -99,7 +95,8 @@ public sealed class ObjectDeserializer(
if (speckleType as string == "reference" && dict.TryGetValue("referencedId", out object? referencedId))
{
var objId = (string)referencedId.NotNull();
var objId = new Id((string)referencedId.NotNull());
cancellationToken.ThrowIfCancellationRequested();
if (!currentClosures.Contains(objId) && (options is null || options.ThrowOnMissingReferences))
{
throw new InvalidOperationException($"current Id: {currentId} has missing closure: {objId}");
@@ -123,7 +120,7 @@ public sealed class ObjectDeserializer(
return b;
}
private object? ReadProperty(JsonReader reader)
private object? ReadProperty(JsonReader reader, CancellationToken cancellationToken)
{
switch (reader.TokenType)
{
@@ -156,9 +153,9 @@ public sealed class ObjectDeserializer(
case JsonToken.Date:
return (DateTime)reader.Value.NotNull();
case JsonToken.StartArray:
return ReadArrayAsync(reader);
return ReadArrayAsync(reader, cancellationToken);
case JsonToken.StartObject:
var dict = ReadObject(reader);
var dict = ReadObject(reader, cancellationToken);
return dict;
default:
@@ -7,9 +7,9 @@ namespace Speckle.Sdk.Serialisation.V2.Receive;
public class ObjectDeserializerFactory : IObjectDeserializerFactory
{
public IObjectDeserializer Create(
string currentId,
IReadOnlyCollection<string> currentClosures,
IReadOnlyDictionary<string, Base> references,
Id currentId,
IReadOnlyCollection<Id> currentClosures,
IReadOnlyDictionary<Id, Base> references,
DeserializeProcessOptions? options = null
) => new ObjectDeserializer(currentId, currentClosures, references, SpeckleObjectSerializerPool.Instance, options);
}
@@ -23,12 +23,12 @@ public sealed class ObjectLoader(
private long _cached;
private long _downloaded;
private long _totalToDownload;
private DeserializeProcessOptions _options = new(false);
private DeserializeProcessOptions _options = new();
[AutoInterfaceIgnore]
public void Dispose() => sqLiteJsonCacheManager.Dispose();
public async Task<(string, IReadOnlyCollection<string>)> GetAndCache(
public async Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(
string rootId,
DeserializeProcessOptions options,
CancellationToken cancellationToken
@@ -42,32 +42,35 @@ public sealed class ObjectLoader(
if (rootJson != null)
{
//assume everything exists as the root is there.
var allChildren = ClosureParser.GetChildrenIds(rootJson).ToList();
var allChildren = ClosureParser.GetChildrenIds(rootJson, cancellationToken).Select(x => new Id(x)).ToList();
//this probably yields away from the Main thread to let host apps update progress
//in any case, this fixes a Revit only issue for this situation
await Task.Yield();
return (rootJson, allChildren);
return (new(rootJson), allChildren);
}
}
rootJson = await serverObjectManager
.DownloadSingleObject(rootId, progress, cancellationToken)
.NotNull()
.ConfigureAwait(false);
IReadOnlyCollection<string> allChildrenIds = ClosureParser
.GetClosures(rootJson)
IReadOnlyCollection<Id> allChildrenIds = ClosureParser
.GetClosures(rootJson, cancellationToken)
.OrderByDescending(x => x.Item2)
.Select(x => x.Item1)
.Where(x => !x.StartsWith("blob", StringComparison.Ordinal))
.Select(x => new Id(x.Item1))
.Where(x => !x.Value.StartsWith("blob", StringComparison.Ordinal))
.Freeze();
_allChildrenCount = allChildrenIds.Count;
await GetAndCache(allChildrenIds, cancellationToken).ConfigureAwait(false);
await GetAndCache(allChildrenIds.Select(x => x.Value), cancellationToken, _options.MaxParallelism)
.ConfigureAwait(false);
CheckForExceptions();
cancellationToken.ThrowIfCancellationRequested();
//save the root last to shortcut later
if (!options.SkipCache)
{
sqLiteJsonCacheManager.SaveObject(rootId, rootJson);
}
return (rootJson, allChildrenIds);
return (new(rootJson), allChildrenIds);
}
[AutoInterfaceIgnore]
@@ -0,0 +1,76 @@
using System.Collections.Concurrent;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Models;
using Speckle.Sdk.SQLite;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
namespace Speckle.Sdk.Serialisation.V2.Send;
[GenerateAutoInterface]
public class BaseSerializer(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IObjectSerializerFactory objectSerializerFactory
) : IBaseSerializer
{
private readonly Pool<List<(Id, Json, Closures)>> _pool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly ConcurrentDictionary<Id, ObjectReference> _objectReferences = new();
public IReadOnlyDictionary<Id, ObjectReference> ObjectReferences => _objectReferences;
//leave this sync
public IEnumerable<BaseItem> Serialise(
Base obj,
IReadOnlyDictionary<Id, NodeInfo> childInfo,
bool skipCacheRead,
CancellationToken cancellationToken
)
{
if (!skipCacheRead && obj.id != null)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(obj.id);
if (cachedJson != null)
{
yield return new BaseItem(new(obj.id.NotNull()), new(cachedJson), false, null);
yield break;
}
}
using var serializer2 = objectSerializerFactory.Create(childInfo, cancellationToken);
var items = _pool.Get();
try
{
items.AddRange(serializer2.Serialize(obj));
foreach (var kvp in serializer2.ObjectReferences)
{
_objectReferences.TryAdd(kvp.Key, kvp.Value);
}
var (id, json, closures) = items.First();
yield return CheckCache(id, json, closures, skipCacheRead);
foreach (var (cid, cJson, cClosures) in items.Skip(1))
{
yield return CheckCache(cid, cJson, cClosures, skipCacheRead);
}
}
finally
{
_pool.Return(items);
}
}
private BaseItem CheckCache(Id id, Json json, Dictionary<Id, int> closures, bool skipCacheRead)
{
if (!skipCacheRead)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(id.Value);
if (cachedJson != null)
{
return new BaseItem(id, new(cachedJson), false, null);
}
}
return new BaseItem(id, json, true, closures);
}
}
@@ -15,8 +15,8 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public readonly record struct NodeInfo(Json Json, Closures? C)
{
public Closures GetClosures() =>
C ?? ClosureParser.GetClosures(Json.Value).ToDictionary(x => new Id(x.Item1), x => x.Item2);
public Closures GetClosures(CancellationToken cancellationToken) =>
C ?? ClosureParser.GetClosures(Json.Value, cancellationToken).ToDictionary(x => new Id(x.Item1), x => x.Item2);
}
public partial interface IObjectSerializer : IDisposable;
@@ -283,7 +283,7 @@ public sealed class ObjectSerializer : IObjectSerializer
if (baseObj.id != null && _childCache.TryGetValue(new(baseObj.id), out var info))
{
id = new Id(baseObj.id);
childClosures = info.GetClosures();
childClosures = info.GetClosures(_cancellationToken);
json = info.Json;
MergeClosures(_currentClosures, childClosures);
}
@@ -42,6 +42,7 @@ public sealed class PriorityScheduler(
{
break;
}
TryExecuteTask(t);
if (cancellationToken.IsCancellationRequested)
{
@@ -49,6 +50,10 @@ public sealed class PriorityScheduler(
}
}
}
catch (OperationCanceledException)
{
//cancelling so end thread
}
#pragma warning disable CA1031
catch (Exception e)
#pragma warning restore CA1031
@@ -7,7 +7,6 @@ using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Models;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
namespace Speckle.Sdk.Serialisation.V2.Send;
@@ -31,7 +30,7 @@ public sealed class SerializeProcess(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
IBaseSerializer baseSerializer,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
@@ -50,11 +49,9 @@ public sealed class SerializeProcess(
cancellationToken
);
private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false);
private readonly SerializeProcessOptions _options = options ?? new();
private readonly ILogger<SerializeProcess> _logger = loggerFactory.CreateLogger<SerializeProcess>();
private readonly ConcurrentDictionary<Id, ObjectReference> _objectReferences = new();
private readonly Pool<List<(Id, Json, Closures)>> _pool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly Pool<Dictionary<Id, NodeInfo>> _currentClosurePool = Pools.CreateDictionaryPool<Id, NodeInfo>();
private readonly Pool<ConcurrentDictionary<Id, NodeInfo>> _childClosurePool = Pools.CreateConcurrentDictionaryPool<
Id,
@@ -83,6 +80,7 @@ public sealed class SerializeProcess(
var findTotalObjectsTask = Task.CompletedTask;
if (!_options.SkipFindTotalObjects)
{
cancellationToken.ThrowIfCancellationRequested();
findTotalObjectsTask = Task.Factory.StartNew(
() => TraverseTotal(root),
cancellationToken,
@@ -90,11 +88,12 @@ public sealed class SerializeProcess(
_highest
);
}
await Traverse(root, cancellationToken).ConfigureAwait(true);
await Traverse(root).ConfigureAwait(true);
DoneTraversing();
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(true);
await DoneSaving().ConfigureAwait(true);
return new(root.id.NotNull(), _objectReferences.Freeze());
cancellationToken.ThrowIfCancellationRequested();
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
}
private void TraverseTotal(Base obj)
@@ -107,19 +106,17 @@ public sealed class SerializeProcess(
}
}
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, CancellationToken cancellationToken)
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
{
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
foreach (var child in baseChildFinder.GetChildren(obj))
{
// tmp is necessary because of the way closures close over loop variables
var tmp = child;
cancellationToken.ThrowIfCancellationRequested();
var t = Task
.Factory.StartNew(
async () =>
{
return await Traverse(tmp, cancellationToken).ConfigureAwait(true);
},
async () => await Traverse(tmp).ConfigureAwait(true),
cancellationToken,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_belowNormal
@@ -143,7 +140,7 @@ public sealed class SerializeProcess(
_currentClosurePool.Return(childClosure);
}
var items = Serialise(obj, childClosures, cancellationToken);
var items = baseSerializer.Serialise(obj, childClosures, _options.SkipCacheRead, cancellationToken);
var currentClosures = _currentClosurePool.Get();
Interlocked.Increment(ref _objectCount);
@@ -165,60 +162,7 @@ public sealed class SerializeProcess(
return currentClosures;
}
//leave this sync
private IEnumerable<BaseItem> Serialise(
Base obj,
IReadOnlyDictionary<Id, NodeInfo> childInfo,
CancellationToken cancellationToken
)
{
if (!_options.SkipCacheRead && obj.id != null)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(obj.id);
if (cachedJson != null)
{
yield return new BaseItem(new(obj.id.NotNull()), new(cachedJson), false, null);
yield break;
}
}
using var serializer2 = objectSerializerFactory.Create(childInfo, cancellationToken);
var items = _pool.Get();
try
{
items.AddRange(serializer2.Serialize(obj));
foreach (var kvp in serializer2.ObjectReferences)
{
_objectReferences.TryAdd(kvp.Key, kvp.Value);
}
var (id, json, closures) = items.First();
yield return CheckCache(id, json, closures);
foreach (var (cid, cJson, cClosures) in items.Skip(1))
{
yield return CheckCache(cid, cJson, cClosures);
}
}
finally
{
_pool.Return(items);
}
}
private BaseItem CheckCache(Id id, Json json, Dictionary<Id, int> closures)
{
if (!_options.SkipCacheRead)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(id.Value);
if (cachedJson != null)
{
return new BaseItem(id, new(cachedJson), false, null);
}
}
return new BaseItem(id, json, true, closures);
}
public override async Task SendToServer(Batch<BaseItem> batch, CancellationToken cancellationToken)
public override async Task SendToServer(Batch<BaseItem> batch)
{
try
{
@@ -238,6 +182,10 @@ public sealed class SerializeProcess(
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, null));
}
}
catch (OperationCanceledException)
{
throw;
}
#pragma warning disable CA1031
catch (Exception e)
#pragma warning restore CA1031
@@ -258,6 +206,10 @@ public sealed class SerializeProcess(
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _objectsSerialized));
}
}
catch (OperationCanceledException)
{
throw;
}
#pragma warning disable CA1031
catch (Exception e)
#pragma warning restore CA1031
@@ -21,6 +21,7 @@ public interface ISerializeProcessFactory
string streamId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
DeserializeProcessOptions? options = null
);
}
@@ -28,7 +29,7 @@ public interface ISerializeProcessFactory
public class SerializeProcessFactory(
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
IObjectDeserializerFactory objectDeserializerFactory,
IBaseDeserializer baseDeserializer,
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory,
IServerObjectManagerFactory serverObjectManagerFactory,
ILoggerFactory loggerFactory
@@ -50,7 +51,7 @@ public class SerializeProcessFactory(
sqLiteJsonCacheManager,
serverObjectManager,
baseChildFinder,
objectSerializerFactory,
new BaseSerializer(sqLiteJsonCacheManager, objectSerializerFactory),
loggerFactory,
cancellationToken,
options
@@ -62,6 +63,7 @@ public class SerializeProcessFactory(
string streamId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
DeserializeProcessOptions? options = null
)
{
@@ -72,6 +74,6 @@ public class SerializeProcessFactory(
//owned by process, refactor later
var objectLoader = new ObjectLoader(sqLiteJsonCacheManager, serverObjectManager, progress);
#pragma warning restore CA2000
return new DeserializeProcess(progress, objectLoader, objectDeserializerFactory, options);
return new DeserializeProcess(progress, objectLoader, baseDeserializer, cancellationToken, options);
}
}
@@ -142,7 +142,7 @@ public sealed class ServerTransport : IServerTransport
api.CancellationToken = CancellationToken;
string? rootObjectJson = await api.DownloadSingleObject(StreamId, id, OnProgressAction).ConfigureAwait(false);
var allIds = ClosureParser.GetChildrenIds(rootObjectJson.NotNull()).ToList();
var allIds = ClosureParser.GetChildrenIds(rootObjectJson.NotNull(), CancellationToken).ToList();
var childrenIds = allIds.Where(x => !x.Contains("blob:"));
var blobIds = allIds.Where(x => x.Contains("blob:")).Select(x => x.Remove(0, 5));
@@ -29,7 +29,7 @@ public static class TransportHelpers
targetTransport.SaveObject(id, parent);
var closures = ClosureParser.GetChildrenIds(parent).ToList();
var closures = ClosureParser.GetChildrenIds(parent, cancellationToken).ToList();
foreach (var closure in closures)
{
@@ -44,13 +44,13 @@ var progress = new Progress(true);
var factory = new SerializeProcessFactory(
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new ObjectDeserializerFactory(),
new BaseDeserializer(new ObjectDeserializerFactory()),
serviceProvider.GetRequiredService<ISqLiteJsonCacheManagerFactory>(),
serviceProvider.GetRequiredService<IServerObjectManagerFactory>(),
new NullLoggerFactory()
);
var process = factory.CreateDeserializeProcess(new Uri(url), streamId, token, progress, new(skipCacheReceive));
var @base = await process.Deserialize(rootId, default).ConfigureAwait(false);
var process = factory.CreateDeserializeProcess(new Uri(url), streamId, token, progress, default, new(skipCacheReceive));
var @base = await process.Deserialize(rootId).ConfigureAwait(false);
Console.WriteLine("Deserialized");
Console.ReadLine();
Console.WriteLine("Executing");
@@ -0,0 +1,51 @@
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Testing.Framework;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests;
public sealed class CancellationSqLiteJsonCacheManager(CancellationTokenSource cancellationTokenSource)
: DummySqLiteJsonCacheManager
{
public override void SaveObjects(IEnumerable<(string id, string json)> items)
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
}
}
public class CancellationSqLiteSendManager(CancellationTokenSource cancellationTokenSource) : DummySqLiteSendManager
{
public override void SaveObjects(IEnumerable<(string id, string json)> items)
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
}
}
public class CancellationServerObjectManager(CancellationTokenSource cancellationTokenSource) : DummyServerObjectManager
{
public override Task UploadObjects(
IReadOnlyList<BaseItem> objects,
bool compressPayloads,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
)
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
return base.UploadObjects(objects, compressPayloads, progress, cancellationToken);
}
public override Task<string?> DownloadSingleObject(
string objectId,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
)
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
return base.DownloadSingleObject(objectId, progress, cancellationToken);
}
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,5 @@
{
"Type": "System.OperationCanceledException",
"Message": "The operation was canceled.",
"Source": "System.Private.CoreLib"
}
@@ -0,0 +1,198 @@
using System.Collections.Concurrent;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Speckle.Objects.Geometry;
using Speckle.Sdk.Host;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Serialization.Tests.Framework;
using Speckle.Sdk.Testing.Framework;
namespace Speckle.Sdk.Serialization.Tests;
public class CancellationTests
{
public CancellationTests()
{
TypeLoader.Reset();
TypeLoader.Initialize(typeof(Base).Assembly, typeof(DetachedTests).Assembly, typeof(Polyline).Assembly);
}
[Fact]
public async Task Cancellation_Serialize()
{
var testClass = new TestClass() { RegularProperty = "Hello" };
using var cancellationSource = new CancellationTokenSource();
using var serializeProcess = new SerializeProcess(
null,
new DummySqLiteSendManager(),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new BaseSerializer(new DummySqLiteSendManager(), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
cancellationSource.Token,
new SerializeProcessOptions(true, true, false, true)
);
await cancellationSource.CancelAsync();
var ex = await Assert.ThrowsAsync<OperationCanceledException>(
async () => await serializeProcess.Serialize(testClass)
);
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
[Fact]
public async Task Cancellation_Save_Server()
{
var testClass = new TestClass() { RegularProperty = "Hello" };
using var cancellationSource = new CancellationTokenSource();
using var serializeProcess = new SerializeProcess(
null,
new DummySqLiteSendManager(),
new CancellationServerObjectManager(cancellationSource),
new BaseChildFinder(new BasePropertyGatherer()),
new BaseSerializer(new DummySqLiteSendManager(), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
cancellationSource.Token,
new SerializeProcessOptions(true, false, false, true)
);
var ex = await Assert.ThrowsAsync<OperationCanceledException>(
async () => await serializeProcess.Serialize(testClass)
);
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
[Fact]
public async Task Cancellation_Save_Sqlite()
{
var testClass = new TestClass() { RegularProperty = "Hello" };
using var cancellationSource = new CancellationTokenSource();
using var serializeProcess = new SerializeProcess(
null,
new CancellationSqLiteSendManager(cancellationSource),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new BaseSerializer(new DummySqLiteSendManager(), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
cancellationSource.Token,
new SerializeProcessOptions(true, false, false, true)
);
var ex = await Assert.ThrowsAsync<OperationCanceledException>(
async () => await serializeProcess.Serialize(testClass)
);
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
[Theory]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818)]
public async Task Cancellation_Receive_Cache(string fileName, string rootId, int oldCount)
{
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
using var cancellationSource = new CancellationTokenSource();
var o = new ObjectLoader(
new CancellationSqLiteJsonCacheManager(cancellationSource),
new DummyReceiveServerObjectManager(closures),
null
);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
cancellationSource.Token,
new(MaxParallelism: 1)
);
var ex = await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var root = await process.Deserialize(rootId);
});
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
[Theory]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818)]
public async Task Cancellation_Receive_Server(string fileName, string rootId, int oldCount)
{
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
using var cancellationSource = new CancellationTokenSource();
var o = new ObjectLoader(
new DummyCancellationSqLiteSendManager(),
new CancellationServerObjectManager(cancellationSource),
null
);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
cancellationSource.Token,
new(MaxParallelism: 1)
);
var ex = await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var root = await process.Deserialize(rootId);
});
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
[Theory]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818)]
public async Task Cancellation_Receive_Deserialize(string fileName, string rootId, int oldCount)
{
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
using var cancellationSource = new CancellationTokenSource();
var o = new ObjectLoader(
new DummySqLiteReceiveManager(closures),
new DummyReceiveServerObjectManager(closures),
null
);
using var process = new DeserializeProcess(
null,
o,
new CancellationBaseDeserializer(cancellationSource),
cancellationSource.Token,
new(MaxParallelism: 1)
);
var ex = await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var root = await process.Deserialize(rootId);
});
await Verify(ex);
cancellationSource.IsCancellationRequested.Should().BeTrue();
}
}
public class CancellationBaseDeserializer(CancellationTokenSource cancellationTokenSource) : IBaseDeserializer
{
public Base Deserialise(
ConcurrentDictionary<Id, Base> baseCache,
Id id,
Json json,
IReadOnlyCollection<Id> closures,
CancellationToken cancellationToken
)
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
throw new NotImplementedException();
}
}
@@ -37,7 +37,7 @@ public class DetachedTests
new DummySendCacheManager(objects),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
@@ -122,7 +122,7 @@ public class DetachedTests
new DummySendCacheManager(objects),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
@@ -192,7 +192,7 @@ public class DetachedTests
new DummySendCacheManager(objects),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
@@ -227,7 +227,7 @@ public class DetachedTests
new DummySendCacheManager(objects),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
@@ -300,7 +300,7 @@ public class DummyServerObjectManager : IServerObjectManager
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<string?> DownloadSingleObject(
public virtual Task<string?> DownloadSingleObject(
string objectId,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
@@ -311,7 +311,7 @@ public class DummyServerObjectManager : IServerObjectManager
CancellationToken cancellationToken
) => Task.FromResult(objectIds.ToDictionary(x => x, _ => false));
public Task UploadObjects(
public virtual Task UploadObjects(
IReadOnlyList<BaseItem> objects,
bool compressPayloads,
IProgress<ProgressArgs>? progress,
@@ -0,0 +1,22 @@
using Speckle.Sdk.SQLite;
namespace Speckle.Sdk.Serialization.Tests;
public class DummyCancellationSqLiteSendManager : ISqLiteJsonCacheManager
{
public string? GetObject(string id) => null;
public void SaveObject(string id, string json) => throw new NotImplementedException();
public void UpdateObject(string id, string json) => throw new NotImplementedException();
public virtual void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public bool HasObject(string objectId) => throw new NotImplementedException();
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
public void Dispose() { }
}
@@ -1,10 +1,8 @@
{
"Type": "AggregateException",
"Type": "System.AggregateException",
"InnerException": {
"Data": {},
"Message": "The method or operation is not implemented.",
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionSendCacheManager.SaveObjects(IEnumerable`1 items)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SaveToCache(List`1 batch)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass98_0`1.<ReadAllConcurrently>b__0(T e)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.<>c__DisplayClass92_0`1.<ReadAllConcurrentlyAsync>b__2(T item, Int64 _)\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
},
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
}
}
@@ -0,0 +1,8 @@
{
"Type": "System.AggregateException",
"InnerException": {
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
}
}
@@ -0,0 +1,5 @@
{
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
}
@@ -0,0 +1,5 @@
{
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
}
@@ -1,30 +1,25 @@
{
"Type": "AggregateException",
"Type": "System.AggregateException",
"InnerExceptions": [
{
"Data": {},
"Message": "The method or operation is not implemented.",
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionServerObjectManager.HasObjects(IReadOnlyCollection`1 objectIds, CancellationToken cancellationToken)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SendToServer(Batch`1 batch, CancellationToken cancellationToken)\nat Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.SendToServer(IMemoryOwner`1 batch, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\n--- End of stack trace from previous location ---\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
},
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
},
{
"Data": {},
"Message": "The method or operation is not implemented.",
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionServerObjectManager.HasObjects(IReadOnlyCollection`1 objectIds, CancellationToken cancellationToken)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SendToServer(Batch`1 batch, CancellationToken cancellationToken)\nat Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.SendToServer(IMemoryOwner`1 batch, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\n--- End of stack trace from previous location ---\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
},
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
},
{
"Data": {},
"Message": "The method or operation is not implemented.",
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionServerObjectManager.HasObjects(IReadOnlyCollection`1 objectIds, CancellationToken cancellationToken)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SendToServer(Batch`1 batch, CancellationToken cancellationToken)\nat Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.SendToServer(IMemoryOwner`1 batch, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\n--- End of stack trace from previous location ---\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
},
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
},
{
"Data": {},
"Message": "The method or operation is not implemented.",
"StackTrace": "at Speckle.Sdk.Serialization.Tests.ExceptionServerObjectManager.HasObjects(IReadOnlyCollection`1 objectIds, CancellationToken cancellationToken)\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.SendToServer(Batch`1 batch, CancellationToken cancellationToken)\nat Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.SendToServer(IMemoryOwner`1 batch, CancellationToken cancellationToken)\n--- End of stack trace from previous location ---\n--- End of stack trace from previous location ---\nat Open.ChannelExtensions.<28d3e838-dde6-44a1-8f5e-d1c739a178d0>Extensions.ReadUntilCancelledAsync[T](<5e816acc-9cf8-4b52-a8da-ceb5bd7eecc7>ChannelReader`1 reader, CancellationToken cancellationToken, Func`3 receiver, Boolean deferredExecution)\n--- End of stack trace from previous location ---",
"Type": "NotImplementedException"
}
],
"StackTrace": "at Speckle.Sdk.Dependencies.Serialization.ChannelSaver`1.DoneSaving()\nat Speckle.Sdk.Serialisation.V2.Send.SerializeProcess.Serialize(Base root)\n--- End of stack trace from previous location ---\nat Xunit.Assert.RecordExceptionAsync(Func`1 testCode)"
"Type": "System.NotImplementedException",
"Message": "The method or operation is not implemented.",
"Source": "Speckle.Sdk.Serialization.Tests"
}
]
}
@@ -1,11 +1,12 @@
using Microsoft.Extensions.Logging.Abstractions;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Speckle.Objects.Geometry;
using Speckle.Sdk.Host;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
using Speckle.Sdk.Serialization.Tests.Framework;
using Speckle.Sdk.Testing.Framework;
namespace Speckle.Sdk.Serialization.Tests;
@@ -28,7 +29,7 @@ public class ExceptionTests
new DummySendCacheManager(objects),
new ExceptionServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, false, true)
@@ -49,7 +50,7 @@ public class ExceptionTests
new ExceptionSendCacheManager(),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new ExceptionSendCacheManager(), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, false, true)
@@ -58,50 +59,67 @@ public class ExceptionTests
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await process2.Serialize(testClass));
await Verify(ex);
}
}
public class ExceptionServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<string?> DownloadSingleObject(
string objectId,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task UploadObjects(
IReadOnlyList<BaseItem> objects,
bool compressPayloads,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
}
public class ExceptionSendCacheManager : ISqLiteJsonCacheManager
{
public void Dispose() { }
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
public string? GetObject(string id) => null;
public void SaveObject(string id, string json) => throw new NotImplementedException();
public void UpdateObject(string id, string json) => throw new NotImplementedException();
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public bool HasObject(string objectId) => throw new NotImplementedException();
[Theory]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818)]
public async Task Test_Exceptions_Receive_Server(string fileName, string rootId, int oldCount)
{
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
var o = new ObjectLoader(new DummySqLiteReceiveManager(closures), new ExceptionServerObjectManager(), null);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
default,
new(true, MaxParallelism: 1)
);
var ex = await Assert.ThrowsAsync<NotImplementedException>(async () =>
{
var root = await process.Deserialize(rootId);
});
await Verify(ex);
}
[Theory]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818, false)]
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818, true)]
public async Task Test_Exceptions_Receive_Cache(string fileName, string rootId, int oldCount, bool? hasObject)
{
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
var o = new ObjectLoader(
new ExceptionSendCacheManager(hasObject),
new DummyReceiveServerObjectManager(closures),
null
);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
default,
new(MaxParallelism: 1)
);
Exception ex;
if (hasObject == true)
{
ex = await Assert.ThrowsAsync<NotImplementedException>(async () =>
{
var root = await process.Deserialize(rootId);
});
}
else
{
ex = await Assert.ThrowsAsync<AggregateException>(async () =>
{
var root = await process.Deserialize(rootId);
});
}
await Verify(ex).UseParameters(hasObject);
}
}
@@ -24,7 +24,7 @@ public class ExplicitInterfaceTests
new DummySendCacheManager(objects),
new DummyServerObjectManager(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySendCacheManager(objects), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(false, false, true, true)
@@ -0,0 +1,22 @@
using Speckle.Sdk.SQLite;
namespace Speckle.Sdk.Serialization.Tests.Framework;
public class ExceptionSendCacheManager(bool? hasObject = null) : ISqLiteJsonCacheManager
{
public void Dispose() { }
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
public string? GetObject(string id) => null;
public void SaveObject(string id, string json) => throw new NotImplementedException();
public void UpdateObject(string id, string json) => throw new NotImplementedException();
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public bool HasObject(string objectId) => hasObject ?? throw new NotImplementedException();
}
@@ -0,0 +1,32 @@
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests.Framework;
public class ExceptionServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<string?> DownloadSingleObject(
string objectId,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task UploadObjects(
IReadOnlyList<BaseItem> objects,
bool compressPayloads,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
}
@@ -0,0 +1,60 @@
using System.IO.Compression;
using System.Reflection;
using Speckle.Newtonsoft.Json.Linq;
using Speckle.Objects.Data;
using Speckle.Sdk.Common;
using Speckle.Sdk.Host;
using Speckle.Sdk.Models;
namespace Speckle.Sdk.Serialization.Tests.Framework;
public static class TestFileManager
{
static TestFileManager()
{
TypeLoader.Reset();
TypeLoader.Initialize(typeof(Base).Assembly, typeof(DataObject).Assembly, _assembly);
}
private static readonly Assembly _assembly = Assembly.GetExecutingAssembly();
private static readonly Dictionary<string, IReadOnlyDictionary<string, string>> _objects = new();
public static async Task<IReadOnlyDictionary<string, string>> GetFileAsClosures(string fileName)
{
if (!_objects.TryGetValue(fileName, out var closure))
{
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
closure = ReadAsObjects(json);
_objects.Add(fileName, closure);
}
return closure;
}
private static async Task<string> ReadJson(string fullName)
{
await using var stream = _assembly.GetManifestResourceStream(fullName).NotNull();
if (fullName.EndsWith(".gz"))
{
await using var z = new GZipStream(stream, CompressionMode.Decompress);
using var reader2 = new StreamReader(z);
return await reader2.ReadToEndAsync();
}
using var reader = new StreamReader(stream);
return await reader.ReadToEndAsync();
}
private static Dictionary<string, string> ReadAsObjects(string json)
{
var jsonObjects = new Dictionary<string, string>();
var array = JArray.Parse(json);
foreach (var obj in array)
{
if (obj is JObject jobj)
{
jsonObjects.Add(jobj["id"].NotNull().Value<string>().NotNull(), jobj.ToString());
}
}
return jsonObjects;
}
}
@@ -0,0 +1,35 @@
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests.Framework;
public class TestTransport(IReadOnlyDictionary<string, string> objects) : ITransport
{
public string TransportName
{
get => "Test";
set { }
}
public Dictionary<string, object> TransportContext { get; }
public TimeSpan Elapsed { get; }
public int SavedObjectCount { get; }
public CancellationToken CancellationToken { get; set; }
public IProgress<ProgressArgs>? OnProgressAction { get; set; }
public Action<string, Exception>? OnErrorAction { get; set; }
public void BeginWrite() => throw new NotImplementedException();
public void EndWrite() => throw new NotImplementedException();
public void SaveObject(string id, string serializedObject) => throw new NotImplementedException();
public Task WriteComplete() => throw new NotImplementedException();
public Task<string?> GetObject(string id) => Task.FromResult(objects.GetValueOrDefault(id));
public Task<string> CopyObjectAndChildren(string id, ITransport targetTransport) =>
throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(IReadOnlyList<string> objectIds) =>
throw new NotImplementedException();
}
@@ -1,15 +1,10 @@
using Speckle.Sdk.Transports;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests;
namespace Speckle.Sdk.Serialization.Tests.Framework;
public class TestTransport : ITransport
public class TestTransport2(IDictionary<string, string> objects) : ITransport
{
public IDictionary<string, string> Objects { get; }
public TestTransport(IDictionary<string, string> objects)
{
Objects = objects;
}
public IDictionary<string, string> Objects { get; } = objects;
public string TransportName
{
@@ -1,11 +1,8 @@
using System.Collections.Concurrent;
using System.IO.Compression;
using System.Reflection;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Speckle.Newtonsoft.Json;
using Speckle.Newtonsoft.Json.Linq;
using Speckle.Objects.Data;
using Speckle.Sdk.Common;
using Speckle.Sdk.Host;
using Speckle.Sdk.Models;
@@ -13,6 +10,7 @@ using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Serialization.Tests.Framework;
using Speckle.Sdk.Testing.Framework;
namespace Speckle.Sdk.Serialization.Tests;
@@ -21,14 +19,14 @@ public class SerializationTests
{
private class TestLoader(string json) : IObjectLoader
{
public Task<(string, IReadOnlyCollection<string>)> GetAndCache(
public Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(
string rootId,
DeserializeProcessOptions? options,
CancellationToken cancellationToken
)
{
var childrenIds = ClosureParser.GetChildrenIds(json).ToList();
return Task.FromResult<(string, IReadOnlyCollection<string>)>((json, childrenIds));
var childrenIds = ClosureParser.GetChildrenIds(new(json), cancellationToken).Select(x => new Id(x)).ToList();
return Task.FromResult<(Json, IReadOnlyCollection<Id>)>((new(json), childrenIds));
}
public string? LoadId(string id) => null;
@@ -36,41 +34,6 @@ public class SerializationTests
public void Dispose() { }
}
private readonly Assembly _assembly = Assembly.GetExecutingAssembly();
public SerializationTests()
{
TypeLoader.Reset();
TypeLoader.Initialize(typeof(Base).Assembly, typeof(DataObject).Assembly, _assembly);
}
private async Task<string> ReadJson(string fullName)
{
await using var stream = _assembly.GetManifestResourceStream(fullName).NotNull();
if (fullName.EndsWith(".gz"))
{
await using var z = new GZipStream(stream, CompressionMode.Decompress);
using var reader2 = new StreamReader(z);
return await reader2.ReadToEndAsync();
}
using var reader = new StreamReader(stream);
return await reader.ReadToEndAsync();
}
private Dictionary<string, string> ReadAsObjects(string json)
{
var jsonObjects = new Dictionary<string, string>();
var array = JArray.Parse(json);
foreach (var obj in array)
{
if (obj is JObject jobj)
{
jsonObjects.Add(jobj["id"].NotNull().Value<string>().NotNull(), jobj.ToString());
}
}
return jsonObjects;
}
/*
[Test]
[TestCase("RevitObject.json")]
@@ -84,9 +47,9 @@ public class SerializationTests
@base.Should().NotBeNull();
}*/
public class TestObjectLoader(Dictionary<string, string> idToObject) : IObjectLoader
public class TestObjectLoader(IReadOnlyDictionary<string, string> idToObject) : IObjectLoader
{
public Task<(string, IReadOnlyCollection<string>)> GetAndCache(
public Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(
string rootId,
DeserializeProcessOptions? options,
CancellationToken cancellationToken
@@ -98,8 +61,8 @@ public class SerializationTests
throw new KeyNotFoundException("Root not found");
}
var allChildren = ClosureParser.GetChildrenIds(json).ToList();
return Task.FromResult<(string, IReadOnlyCollection<string>)>((json, allChildren));
var allChildren = ClosureParser.GetChildrenIds(json, cancellationToken).Select(x => new Id(x)).ToList();
return Task.FromResult<(Json, IReadOnlyCollection<Id>)>((new(json), allChildren));
}
public string? LoadId(string id) => idToObject.GetValueOrDefault(id);
@@ -111,16 +74,14 @@ public class SerializationTests
[InlineData("RevitObject.json.gz")]
public async Task Basic_Namespace_Validation(string fileName)
{
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
var closure = ReadAsObjects(json);
var closures = await TestFileManager.GetFileAsClosures(fileName);
var deserializer = new SpeckleObjectDeserializer
{
ReadTransport = new TestTransport(closure),
ReadTransport = new TestTransport(closures),
CancellationToken = default,
};
foreach (var (id, objJson) in closure)
foreach (var (id, objJson) in closures)
{
var jObject = JObject.Parse(objJson);
var oldSpeckleType = jObject["speckle_type"].NotNull().Value<string>().NotNull();
@@ -153,11 +114,14 @@ public class SerializationTests
[InlineData("RevitObject.json.gz")]
public async Task Basic_Namespace_Validation_New(string fileName)
{
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
var closures = ReadAsObjects(json);
using var process = new DeserializeProcess(null, new TestObjectLoader(closures), new ObjectDeserializerFactory());
await process.Deserialize("3416d3fe01c9196115514c4a2f41617b", default);
var closures = await TestFileManager.GetFileAsClosures(fileName);
using var process = new DeserializeProcess(
null,
new TestObjectLoader(closures),
new BaseDeserializer(new ObjectDeserializerFactory()),
default
);
await process.Deserialize("3416d3fe01c9196115514c4a2f41617b");
foreach (var (id, objJson) in closures)
{
var jObject = JObject.Parse(objJson);
@@ -172,7 +136,7 @@ public class SerializationTests
}
else
{
var baseType = process.BaseCache[id];
var baseType = process.BaseCache[new Id(id)];
starts = baseType.speckle_type.StartsWith("Speckle.Core.") || baseType.speckle_type.StartsWith("Objects.");
starts.Should().BeTrue($"{baseType.speckle_type} isn't expected");
@@ -208,23 +172,21 @@ public class SerializationTests
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818)]
public async Task Roundtrip_Test_Old(string fileName, string _, int count)
{
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
var closure = ReadAsObjects(json);
var closures = await TestFileManager.GetFileAsClosures(fileName);
var deserializer = new SpeckleObjectDeserializer
{
ReadTransport = new TestTransport(closure),
ReadTransport = new TestTransport(closures),
CancellationToken = default,
};
var writtenObjects = new Dictionary<string, string>();
var writeTransport = new TestTransport(writtenObjects);
var writeTransport = new TestTransport2(writtenObjects);
var serializer = new SpeckleObjectSerializer([writeTransport]);
var newIds = new Dictionary<string, string>();
var oldIds = new Dictionary<string, string>();
var idToBase = new Dictionary<string, Base>();
closure.Count.Should().Be(count);
foreach (var (id, objJson) in closure)
closures.Count.Should().Be(count);
foreach (var (id, objJson) in closures)
{
var base1 = await deserializer.DeserializeAsync(objJson);
base1.id.Should().Be(id);
@@ -244,18 +206,22 @@ public class SerializationTests
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818, 4674)]
public async Task Roundtrip_Test_New(string fileName, string rootId, int oldCount, int newCount)
{
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
var closure = ReadAsObjects(json);
closure.Count.Should().Be(oldCount);
var closures = await TestFileManager.GetFileAsClosures(fileName);
closures.Count.Should().Be(oldCount);
var o = new ObjectLoader(
new DummySqLiteReceiveManager(closure),
new DummyReceiveServerObjectManager(closure),
new DummySqLiteReceiveManager(closures),
new DummyReceiveServerObjectManager(closures),
null
);
using var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(true));
var root = await process.Deserialize(rootId, default);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
default,
new(true)
);
var root = await process.Deserialize(rootId);
process.BaseCache.Count.Should().Be(oldCount);
process.Total.Should().Be(oldCount);
@@ -265,7 +231,7 @@ public class SerializationTests
new DummySqLiteSendManager(),
new DummySendServerObjectManager(newIdToJson),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new BaseSerializer(new DummySqLiteSendManager(), new ObjectSerializerFactory(new BasePropertyGatherer())),
new NullLoggerFactory(),
default,
new SerializeProcessOptions(true, true, false, true)
@@ -277,7 +243,7 @@ public class SerializationTests
foreach (var newKvp in newIdToJson)
{
if (closure.TryGetValue(newKvp.Key, out var newValue))
if (closures.TryGetValue(newKvp.Key, out var newValue))
{
JToken.DeepEquals(JObject.Parse(newValue), JObject.Parse(newKvp.Value));
}
@@ -57,8 +57,14 @@ public class GeneralDeserializer : IDisposable
null
);
var o = new ObjectLoader(sqlite, serverObjects, null);
using var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(skipCache));
return await process.Deserialize(rootId, default).ConfigureAwait(false);
using var process = new DeserializeProcess(
null,
o,
new BaseDeserializer(new ObjectDeserializerFactory()),
default,
new(skipCache)
);
return await process.Deserialize(rootId).ConfigureAwait(false);
}
/*
@@ -55,7 +55,7 @@ public class GeneralReceiveTest : IDisposable
[Benchmark]
public async Task<Base> RunTest_Receive2()
{
return await _operations.Receive2(_baseUrl, streamId, rootId, null);
return await _operations.Receive2(_baseUrl, streamId, rootId, null, null, default);
}
[GlobalCleanup]