diff --git a/src/Speckle.Sdk.Dependencies/Pools.cs b/src/Speckle.Sdk.Dependencies/Pools.cs index d4721910..86bfe7c1 100644 --- a/src/Speckle.Sdk.Dependencies/Pools.cs +++ b/src/Speckle.Sdk.Dependencies/Pools.cs @@ -21,5 +21,31 @@ public static class Pools public static Pool StringBuilders { get; } = new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 }); - public static Pool> CreateListPool() => new(new DefaultPooledObjectPolicy>()); + private sealed class ObjectDictionaryPolicy : IPooledObjectPolicy> + where TKey : notnull + { + public Dictionary Create() => new(50); + + public bool Return(Dictionary obj) + { + obj.Clear(); + return true; + } + } + + private sealed class ObjectListPolicy : IPooledObjectPolicy> + { + public List Create() => new(50); + + public bool Return(List obj) + { + obj.Clear(); + return true; + } + } + + public static Pool> CreateListPool() => new(new ObjectListPolicy()); + + public static Pool> CreateDictionaryPool() + where TKey : notnull => new(new ObjectDictionaryPolicy()); } diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs index 2154b26c..cb56850c 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs @@ -1,18 +1,18 @@ using System.Threading.Channels; using Open.ChannelExtensions; -using Speckle.Sdk.Dependencies.Serialization; namespace Speckle.Sdk.Serialisation.V2.Send; public static class ChannelExtensions { - public static BatchingChannelReader> BatchBySize( - this ChannelReader source, + public static BatchingChannelReader> BatchBySize( + this ChannelReader source, int batchSize, bool singleReader = false, bool allowSynchronousContinuations = false - ) => - new SizeBatchingChannelReader( + ) + where T : IHasSize => + new SizeBatchingChannelReader( source ?? throw new ArgumentNullException(nameof(source)), batchSize, singleReader, diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs index 447440fd..e9d6e729 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs @@ -2,7 +2,7 @@ namespace Speckle.Sdk.Dependencies.Serialization; -public abstract class ChannelLoader +public abstract class ChannelLoader { private const int HTTP_GET_CHUNK_SIZE = 500; private const int MAX_PARALLELISM_HTTP = 4; @@ -27,7 +27,7 @@ public abstract class ChannelLoader public abstract string? CheckCache(string id); - public abstract Task> Download(List ids); + public abstract Task> Download(List ids); - public abstract void SaveToCache(List x); + public abstract void SaveToCache(List x); } diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs index 64d8bfb4..4f974eda 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs @@ -1,27 +1,11 @@ -using System.Text; using System.Threading.Channels; using Open.ChannelExtensions; using Speckle.Sdk.Serialisation.V2.Send; namespace Speckle.Sdk.Dependencies.Serialization; -public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage) -{ - public int Size { get; } = Encoding.UTF8.GetByteCount(Json); - - public bool Equals(BaseItem? other) - { - if (other is null) - { - return false; - } - return string.Equals(Id, other.Value.Id, StringComparison.OrdinalIgnoreCase); - } - - public override int GetHashCode() => Id.GetHashCode(); -} - -public abstract class ChannelSaver +public abstract class ChannelSaver + where T : IHasSize { private const int SEND_CAPACITY = 50; private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes @@ -31,7 +15,9 @@ public abstract class ChannelSaver private const int MAX_CACHE_WRITE_PARALLELISM = 1; private const int MAX_CACHE_BATCH = 200; - private readonly Channel _checkCacheChannel = Channel.CreateBounded( + private bool _enabled; + + private readonly Channel _checkCacheChannel = Channel.CreateBounded( new BoundedChannelOptions(SEND_CAPACITY) { AllowSynchronousContinuations = true, @@ -43,35 +29,60 @@ public abstract class ChannelSaver _ => throw new NotImplementedException("Dropping items not supported.") ); - public Task Start(CancellationToken cancellationToken = default) + public Task Start( + bool enableServerSending = true, + bool enableCacheSaving = true, + CancellationToken cancellationToken = default + ) { - var t = _checkCacheChannel - .Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE) - .WithTimeout(HTTP_BATCH_TIMEOUT) - .PipeAsync( - MAX_PARALLELISM_HTTP, - async x => await SendToServer(x, cancellationToken).ConfigureAwait(false), - HTTP_CAPACITY, - false, - cancellationToken - ) - .Join() - .Batch(MAX_CACHE_BATCH) - .WithTimeout(HTTP_BATCH_TIMEOUT) - .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken); - return t; + ValueTask t = new(Task.FromResult(0L)); + if (enableServerSending) + { + _enabled = true; + var tChannelReader = _checkCacheChannel + .Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE) + .WithTimeout(HTTP_BATCH_TIMEOUT) + .PipeAsync( + MAX_PARALLELISM_HTTP, + async x => await SendToServer(x, cancellationToken).ConfigureAwait(false), + HTTP_CAPACITY, + false, + cancellationToken + ); + if (enableCacheSaving) + { + t = new( + tChannelReader + .Join() + .Batch(MAX_CACHE_BATCH) + .WithTimeout(HTTP_BATCH_TIMEOUT) + .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken) + ); + } + else + { + t = tChannelReader.ReadUntilCancelledAsync(cancellationToken, (list, l) => new ValueTask()); + } + } + + return t.AsTask(); } - public async Task Save(BaseItem item, CancellationToken cancellationToken = default) => - await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); + public async ValueTask Save(T item, CancellationToken cancellationToken = default) + { + if (_enabled) + { + await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); + } + } - public abstract Task> SendToServer(List batch, CancellationToken cancellationToken); + public abstract Task> SendToServer(List batch, CancellationToken cancellationToken); - public Task Done() + public ValueTask Done() { _checkCacheChannel.Writer.Complete(); - return Task.CompletedTask; + return new(Task.CompletedTask); } - public abstract void SaveToCache(List item); + public abstract void SaveToCache(List item); } diff --git a/src/Speckle.Sdk.Dependencies/Serialization/SizeBatchingChannelReader.cs b/src/Speckle.Sdk.Dependencies/Serialization/SizeBatchingChannelReader.cs index c824e9fb..9c668547 100644 --- a/src/Speckle.Sdk.Dependencies/Serialization/SizeBatchingChannelReader.cs +++ b/src/Speckle.Sdk.Dependencies/Serialization/SizeBatchingChannelReader.cs @@ -1,28 +1,33 @@ using System.Threading.Channels; using Open.ChannelExtensions; -using Speckle.Sdk.Dependencies.Serialization; namespace Speckle.Sdk.Serialisation.V2.Send; -public class SizeBatchingChannelReader( - ChannelReader source, +public interface IHasSize +{ + int Size { get; } +} + +public class SizeBatchingChannelReader( + ChannelReader source, int batchSize, bool singleReader, bool syncCont = false -) : BatchingChannelReader>(source, batchSize, singleReader, syncCont) +) : BatchingChannelReader>(source, batchSize, singleReader, syncCont) + where T : IHasSize { private readonly int _batchSize = batchSize; - protected override List CreateBatch(int capacity) => new(); + protected override List CreateBatch(int capacity) => new(); - protected override void TrimBatch(List batch) => batch.TrimExcess(); + protected override void TrimBatch(List batch) => batch.TrimExcess(); - protected override void AddBatchItem(List batch, BaseItem item) => batch.Add(item); + protected override void AddBatchItem(List batch, T item) => batch.Add(item); - protected override int GetBatchSize(List batch) + protected override int GetBatchSize(List batch) { int size = 0; - foreach (BaseItem item in batch) + foreach (T item in batch) { size += item.Size; } diff --git a/src/Speckle.Sdk/Serialisation/V2/DummySendServerObjectManager.cs b/src/Speckle.Sdk/Serialisation/V2/DummySendServerObjectManager.cs index e50ef75e..950a8667 100644 --- a/src/Speckle.Sdk/Serialisation/V2/DummySendServerObjectManager.cs +++ b/src/Speckle.Sdk/Serialisation/V2/DummySendServerObjectManager.cs @@ -1,5 +1,5 @@ using System.Text; -using Speckle.Sdk.Dependencies.Serialization; +using Speckle.Sdk.Serialisation.V2.Send; using Speckle.Sdk.SQLite; using Speckle.Sdk.Transports; @@ -23,7 +23,7 @@ public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager public class DummySendServerObjectManager : IServerObjectManager { public IAsyncEnumerable<(string, string)> DownloadObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, IProgress? progress, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -35,7 +35,7 @@ public class DummySendServerObjectManager : IServerObjectManager ) => throw new NotImplementedException(); public Task> HasObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -49,7 +49,7 @@ public class DummySendServerObjectManager : IServerObjectManager long totalBytes = 0; foreach (var item in objects) { - totalBytes += Encoding.Default.GetByteCount(item.Json); + totalBytes += Encoding.Default.GetByteCount(item.Json.Value); } progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes)); diff --git a/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs b/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs index b55e30ba..7201c4d0 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs @@ -3,6 +3,7 @@ using Speckle.Sdk.Common; using Speckle.Sdk.Dependencies; using Speckle.Sdk.Dependencies.Serialization; using Speckle.Sdk.Serialisation.Utilities; +using Speckle.Sdk.Serialisation.V2.Send; using Speckle.Sdk.SQLite; using Speckle.Sdk.Transports; @@ -13,7 +14,7 @@ public sealed class ObjectLoader( ISqLiteJsonCacheManager sqLiteJsonCacheManager, IServerObjectManager serverObjectManager, IProgress? progress -) : ChannelLoader, IObjectLoader +) : ChannelLoader, IObjectLoader { private int? _allChildrenCount; private long _checkCache; @@ -80,13 +81,13 @@ public sealed class ObjectLoader( var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default) ) { - toCache.Add(new(id, json, true)); + toCache.Add(new(new(id), new(json), true, null)); } if (toCache.Count != ids.Count) { throw new SpeckleException( - $"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id)).Take(10))}" + $"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id.Value)).Take(10))}" ); } return toCache; @@ -97,7 +98,7 @@ public sealed class ObjectLoader( { if (!_options.SkipCache) { - sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json))); + sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value))); Interlocked.Exchange(ref _cached, _cached + batch.Count); progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount)); } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs index 7df0d7ac..2777fdfc 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs @@ -9,19 +9,25 @@ using Speckle.Sdk.Dependencies; using Speckle.Sdk.Helpers; using Speckle.Sdk.Models; using Speckle.Sdk.Serialisation.Utilities; -using Closures = System.Collections.Generic.IReadOnlyDictionary; +using Closures = System.Collections.Generic.Dictionary; namespace Speckle.Sdk.Serialisation.V2.Send; -public readonly record struct CacheInfo(Json Json, Closures Closures); +public readonly record struct NodeInfo(Json Json, Closures? C) +{ + public Closures GetClosures() => + C ?? ClosureParser.GetClosures(Json.Value).ToDictionary(x => new Id(x.Item1), x => x.Item2); +} + +public partial interface IObjectSerializer : IDisposable; [GenerateAutoInterface] -public class ObjectSerializer : IObjectSerializer +public sealed class ObjectSerializer : IObjectSerializer { private HashSet _parentObjects = new(); private readonly Dictionary _currentClosures = new(); - private readonly IDictionary _baseCache; + private readonly IReadOnlyDictionary _childCache; private readonly bool _trackDetachedChildren; private readonly IBasePropertyGatherer _propertyGatherer; @@ -33,7 +39,14 @@ public class ObjectSerializer : IObjectSerializer /// public Dictionary ObjectReferences { get; } = new(); - private readonly List<(Id, Json)> _chunks = new(); + private readonly List<(Id, Json, Closures)> _chunks; + private readonly Pool> _chunksPool; + + private readonly List> _chunks2 = new(); + private readonly Pool> _chunks2Pool; + + private readonly List> _chunks3 = new(); + private readonly Pool> _chunks3Pool; /// /// Creates a new Serializer instance. @@ -42,22 +55,43 @@ public class ObjectSerializer : IObjectSerializer /// public ObjectSerializer( IBasePropertyGatherer propertyGatherer, - IDictionary baseCache, + IReadOnlyDictionary childCache, + Pool> chunksPool, + Pool> chunks2Pool, + Pool> chunks3Pool, bool trackDetachedChildren = false, CancellationToken cancellationToken = default ) { _propertyGatherer = propertyGatherer; - _baseCache = baseCache; + _childCache = childCache; + _chunksPool = chunksPool; + _chunks2Pool = chunks2Pool; + _chunks3Pool = chunks3Pool; _cancellationToken = cancellationToken; _trackDetachedChildren = trackDetachedChildren; + _chunks = chunksPool.Get(); + } + + [AutoInterfaceIgnore] + public void Dispose() + { + _chunksPool.Return(_chunks); + foreach (var c2 in _chunks2) + { + _chunks2Pool.Return(c2); + } + foreach (var c3 in _chunks3) + { + _chunks3Pool.Return(c3); + } } /// The object to serialize /// The serialized JSON /// The serializer is busy (already serializing an object) /// Failed to extract (pre-serialize) properties from the - public IEnumerable<(Id, Json)> Serialize(Base baseObj) + public IEnumerable<(Id, Json, Closures)> Serialize(Base baseObj) { try { @@ -70,8 +104,7 @@ public class ObjectSerializer : IObjectSerializer { throw new SpeckleSerializeException($"Failed to extract (pre-serialize) properties from the {baseObj}", ex); } - _baseCache[baseObj] = new(item.Item2, _currentClosures); - yield return (item.Item1, item.Item2); + yield return (item.Item1, item.Item2, _currentClosures); foreach (var chunk in _chunks) { yield return chunk; @@ -232,27 +265,6 @@ public class ObjectSerializer : IObjectSerializer { return null; } - Closures childClosures; - Id id; - Json json; - //avoid multiple serialization to get closures - if (_baseCache.TryGetValue(baseObj, out var info)) - { - id = new Id(baseObj.id.NotNull()); - childClosures = info.Closures; - json = info.Json; - MergeClosures(_currentClosures, childClosures); - } - else - { - childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : []; - var sb = Pools.StringBuilders.Get(); - using var writer = new StringWriter(sb); - using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer); - id = SerializeBaseObject(baseObj, jsonWriter, childClosures); - json = new Json(writer.ToString()); - Pools.StringBuilders.Return(sb); - } _parentObjects.Remove(baseObj); @@ -266,6 +278,27 @@ public class ObjectSerializer : IObjectSerializer if (inheritedDetachInfo.IsDetachable) { + Closures childClosures; + Id id; + Json json; + //avoid multiple serialization to get closures + if (baseObj.id != null && _childCache.TryGetValue(new(baseObj.id), out var info)) + { + id = new Id(baseObj.id); + childClosures = info.GetClosures(); + json = info.Json; + MergeClosures(_currentClosures, childClosures); + } + else + { + childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : []; + var sb = Pools.StringBuilders.Get(); + using var writer = new StringWriter(sb); + using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer); + id = SerializeBaseObject(baseObj, jsonWriter, childClosures); + json = new Json(writer.ToString()); + Pools.StringBuilders.Return(sb); + } var json2 = ReferenceGenerator.CreateReference(id); AddClosure(id); // add to obj refs to return @@ -278,10 +311,20 @@ public class ObjectSerializer : IObjectSerializer closure = childClosures.ToDictionary(x => x.Key.Value, x => x.Value), }; } - _chunks.Add(new(id, json)); + _chunks.Add(new(id, json, [])); return new(id, json2); } - return new(id, json); + else + { + var childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : []; + var sb = Pools.StringBuilders.Get(); + using var writer = new StringWriter(sb); + using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer); + var id = SerializeBaseObject(baseObj, jsonWriter, childClosures); + var json = new Json(writer.ToString()); + Pools.StringBuilders.Return(sb); + return new(id, json); + } } private Id SerializeBaseObject(Base baseObj, JsonWriter writer, Closures closure) @@ -334,12 +377,21 @@ public class ObjectSerializer : IObjectSerializer return id; } + private List GetChunk() + { + var chunk = _chunks3Pool.Get(); + _chunks3.Add(chunk); + return chunk; + } + private void SerializeOrChunkProperty(object? baseValue, JsonWriter jsonWriter, PropertyAttributeInfo detachInfo) { if (baseValue is IEnumerable chunkableCollection && detachInfo.IsChunkable) { - List chunks = new(); - DataChunk crtChunk = new() { data = new List(detachInfo.ChunkSize) }; + List chunks = _chunks2Pool.Get(); + _chunks2.Add(chunks); + + DataChunk crtChunk = new() { data = GetChunk() }; foreach (object element in chunkableCollection) { @@ -347,7 +399,7 @@ public class ObjectSerializer : IObjectSerializer if (crtChunk.data.Count >= detachInfo.ChunkSize) { chunks.Add(crtChunk); - crtChunk = new DataChunk { data = new List(detachInfo.ChunkSize) }; + crtChunk = new DataChunk { data = GetChunk() }; } } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs index 2fa75d05..9c5bbc8a 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializerFactory.cs @@ -1,11 +1,17 @@ using Speckle.InterfaceGenerator; +using Speckle.Sdk.Dependencies; using Speckle.Sdk.Models; +using Closures = System.Collections.Generic.Dictionary; namespace Speckle.Sdk.Serialisation.V2.Send; [GenerateAutoInterface] public class ObjectSerializerFactory(IBasePropertyGatherer propertyGatherer) : IObjectSerializerFactory { - public IObjectSerializer Create(IDictionary baseCache, CancellationToken cancellationToken) => - new ObjectSerializer(propertyGatherer, baseCache, true, cancellationToken); + private readonly Pool> _chunkPool = Pools.CreateListPool<(Id, Json, Closures)>(); + private readonly Pool> _chunk2Pool = Pools.CreateListPool(); + private readonly Pool> _chunk3Pool = Pools.CreateListPool(); + + public IObjectSerializer Create(IReadOnlyDictionary baseCache, CancellationToken cancellationToken) => + new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, true, cancellationToken); } diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs index 42873c7a..564e47f7 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Text; using Speckle.InterfaceGenerator; using Speckle.Sdk.Common; using Speckle.Sdk.Dependencies; @@ -6,14 +7,17 @@ using Speckle.Sdk.Dependencies.Serialization; using Speckle.Sdk.Models; using Speckle.Sdk.SQLite; using Speckle.Sdk.Transports; +using Closures = System.Collections.Generic.Dictionary; namespace Speckle.Sdk.Serialisation.V2.Send; public record SerializeProcessOptions( - bool SkipCacheRead, - bool SkipCacheWrite, - bool SkipServer, - bool SkipFindTotalObjects + bool SkipCacheRead = false, + bool SkipCacheWrite = false, + bool SkipServer = false, + bool SkipFindTotalObjects = false, + bool EnableServerSending = true, + bool EnableCacheSaving = true ); public readonly record struct SerializeProcessResults( @@ -21,6 +25,22 @@ public readonly record struct SerializeProcessResults( IReadOnlyDictionary ConvertedReferences ); +public readonly record struct BaseItem(Id Id, Json Json, bool NeedsStorage, Closures? Closures) : IHasSize +{ + public int Size { get; } = Encoding.UTF8.GetByteCount(Json.Value); + + public bool Equals(BaseItem? other) + { + if (other is null) + { + return false; + } + return string.Equals(Id.Value, other.Value.Id.Value, StringComparison.OrdinalIgnoreCase); + } + + public override int GetHashCode() => Id.GetHashCode(); +} + [GenerateAutoInterface] public class SerializeProcess( IProgress? progress, @@ -29,14 +49,13 @@ public class SerializeProcess( IBaseChildFinder baseChildFinder, IObjectSerializerFactory objectSerializerFactory, SerializeProcessOptions? options = null -) : ChannelSaver, ISerializeProcess +) : ChannelSaver, ISerializeProcess { private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false); - //cache bases and closure info to avoid reserialization - private readonly IDictionary _baseCache = new ConcurrentDictionary(); private readonly ConcurrentDictionary _objectReferences = new(); - private readonly Pool> _pool = Pools.CreateListPool<(Id, Json)>(); + private readonly Pool> _pool = Pools.CreateListPool<(Id, Json, Closures)>(); + private readonly Pool> _childClosurePool = Pools.CreateDictionaryPool(); private long _objectCount; private long _objectsFound; @@ -48,7 +67,7 @@ public class SerializeProcess( public async Task Serialize(Base root, CancellationToken cancellationToken) { - var channelTask = Start(cancellationToken); + var channelTask = Start(_options.EnableServerSending, _options.EnableCacheSaving, cancellationToken); var findTotalObjectsTask = Task.CompletedTask; if (!_options.SkipFindTotalObjects) { @@ -76,9 +95,9 @@ public class SerializeProcess( } } - private async Task Traverse(Base obj, bool isEnd, CancellationToken cancellationToken) + private async Task> Traverse(Base obj, bool isEnd, CancellationToken cancellationToken) { - var tasks = new List(); + var tasks = new List>>(); foreach (var child in baseChildFinder.GetChildren(obj)) { // tmp is necessary because of the way closures close over loop variables @@ -98,8 +117,19 @@ public class SerializeProcess( { await Task.WhenAll(tasks).ConfigureAwait(false); } + var childClosures = _childClosurePool.Get(); + foreach (var t in tasks) + { + var childClosure = t.Result; + foreach (var kvp in childClosure) + { + childClosures[kvp.Key] = kvp.Value; + } + } - var items = Serialise(obj, cancellationToken); + var items = Serialise(obj, childClosures, cancellationToken); + + var currentClosures = new Dictionary(); Interlocked.Increment(ref _objectCount); progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _objectCount, _objectsFound)); foreach (var item in items) @@ -108,28 +138,40 @@ public class SerializeProcess( { await Save(item, cancellationToken).ConfigureAwait(false); } + + if (!currentClosures.ContainsKey(item.Id)) + { + currentClosures.Add(item.Id, new NodeInfo(item.Json, item.Closures)); + } } + _childClosurePool.Return(childClosures); if (isEnd) { await Done().ConfigureAwait(false); } + + return currentClosures; } //leave this sync - private IEnumerable Serialise(Base obj, CancellationToken cancellationToken) + private IEnumerable Serialise( + Base obj, + IReadOnlyDictionary childInfo, + CancellationToken cancellationToken + ) { if (!_options.SkipCacheRead && obj.id != null) { var cachedJson = sqLiteJsonCacheManager.GetObject(obj.id); if (cachedJson != null) { - yield return new BaseItem(obj.id.NotNull(), cachedJson, false); + yield return new BaseItem(new(obj.id.NotNull()), new(cachedJson), false, null); yield break; } } - var serializer2 = objectSerializerFactory.Create(_baseCache, cancellationToken); + using var serializer2 = objectSerializerFactory.Create(childInfo, cancellationToken); var items = _pool.Get(); try { @@ -140,11 +182,11 @@ public class SerializeProcess( _objectReferences.TryAdd(kvp.Key, kvp.Value); } - var (id, json) = items.First(); - yield return CheckCache(id, json); - foreach (var (cid, cJson) in items.Skip(1)) + var (id, json, closures) = items.First(); + yield return CheckCache(id, json, closures); + foreach (var (cid, cJson, cClosures) in items.Skip(1)) { - yield return CheckCache(cid, cJson); + yield return CheckCache(cid, cJson, cClosures); } } finally @@ -153,17 +195,17 @@ public class SerializeProcess( } } - private BaseItem CheckCache(Id id, Json json) + private BaseItem CheckCache(Id id, Json json, Dictionary closures) { if (!_options.SkipCacheRead) { var cachedJson = sqLiteJsonCacheManager.GetObject(id.Value); if (cachedJson != null) { - return new BaseItem(id.Value, cachedJson, false); + return new BaseItem(id, new(cachedJson), false, null); } } - return new BaseItem(id.Value, json.Value, true); + return new BaseItem(id, json, true, closures); } public override async Task> SendToServer(List batch, CancellationToken cancellationToken) @@ -172,9 +214,9 @@ public class SerializeProcess( { var objectBatch = batch.Distinct().ToList(); var hasObjects = await serverObjectManager - .HasObjects(objectBatch.Select(x => x.Id).ToList(), cancellationToken) + .HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), cancellationToken) .ConfigureAwait(false); - objectBatch = batch.Where(x => !hasObjects[x.Id]).ToList(); + objectBatch = batch.Where(x => !hasObjects[x.Id.Value]).ToList(); if (objectBatch.Count != 0) { await serverObjectManager.UploadObjects(objectBatch, true, progress, cancellationToken).ConfigureAwait(false); @@ -190,7 +232,7 @@ public class SerializeProcess( { if (!_options.SkipCacheWrite && batch.Count != 0) { - sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json))); + sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value))); Interlocked.Exchange(ref _cached, _cached + batch.Count); progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _objectsSerialized)); } diff --git a/src/Speckle.Sdk/Serialisation/V2/ServerObjectManager.cs b/src/Speckle.Sdk/Serialisation/V2/ServerObjectManager.cs index d5833868..c957087f 100644 --- a/src/Speckle.Sdk/Serialisation/V2/ServerObjectManager.cs +++ b/src/Speckle.Sdk/Serialisation/V2/ServerObjectManager.cs @@ -5,9 +5,9 @@ using System.Text; using Speckle.InterfaceGenerator; using Speckle.Newtonsoft.Json; using Speckle.Sdk.Common; -using Speckle.Sdk.Dependencies.Serialization; using Speckle.Sdk.Helpers; using Speckle.Sdk.Logging; +using Speckle.Sdk.Serialisation.V2.Send; using Speckle.Sdk.Transports; using Speckle.Sdk.Transports.ServerUtils; @@ -42,7 +42,7 @@ public class ServerObjectManager : IServerObjectManager } public async IAsyncEnumerable<(string, string)> DownloadObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, IProgress? progress, [EnumeratorCancellation] CancellationToken cancellationToken ) @@ -139,7 +139,7 @@ public class ServerObjectManager : IServerObjectManager } public async Task> HasObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, CancellationToken cancellationToken ) { diff --git a/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs b/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs index 9a9cf5db..c48021f1 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs @@ -336,7 +336,7 @@ public class SamplePropBase2 : Base public class DummyServerObjectManager : IServerObjectManager { public IAsyncEnumerable<(string, string)> DownloadObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, IProgress? progress, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -348,7 +348,7 @@ public class DummyServerObjectManager : IServerObjectManager ) => throw new NotImplementedException(); public Task> HasObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -362,7 +362,7 @@ public class DummyServerObjectManager : IServerObjectManager long totalBytes = 0; foreach (var item in objects) { - totalBytes += Encoding.Default.GetByteCount(item.Json); + totalBytes += Encoding.Default.GetByteCount(item.Json.Value); } progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes)); diff --git a/tests/Speckle.Sdk.Serialization.Tests/DummyReceiveServerObjectManager.cs b/tests/Speckle.Sdk.Serialization.Tests/DummyReceiveServerObjectManager.cs index 213bc391..797d7b67 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/DummyReceiveServerObjectManager.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/DummyReceiveServerObjectManager.cs @@ -2,6 +2,7 @@ using System.Text; using Speckle.Sdk.Dependencies.Serialization; using Speckle.Sdk.Serialisation.V2; +using Speckle.Sdk.Serialisation.V2.Send; using Speckle.Sdk.Transports; namespace Speckle.Sdk.Serialization.Tests; @@ -9,7 +10,7 @@ namespace Speckle.Sdk.Serialization.Tests; public class DummyReceiveServerObjectManager(Dictionary objects) : IServerObjectManager { public async IAsyncEnumerable<(string, string)> DownloadObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, IProgress? progress, [EnumeratorCancellation] CancellationToken cancellationToken ) @@ -32,7 +33,7 @@ public class DummyReceiveServerObjectManager(Dictionary objects) } public Task> HasObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -46,7 +47,7 @@ public class DummyReceiveServerObjectManager(Dictionary objects) long totalBytes = 0; foreach (var item in objects) { - totalBytes += Encoding.Default.GetByteCount(item.Json); + totalBytes += Encoding.Default.GetByteCount(item.Json.Value); } progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes)); diff --git a/tests/Speckle.Sdk.Serialization.Tests/DummySendServerObjectManager.cs b/tests/Speckle.Sdk.Serialization.Tests/DummySendServerObjectManager.cs index 1dbaefcd..0d8433fc 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/DummySendServerObjectManager.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/DummySendServerObjectManager.cs @@ -4,6 +4,7 @@ using Speckle.Newtonsoft.Json.Linq; using Speckle.Sdk.Common; using Speckle.Sdk.Dependencies.Serialization; using Speckle.Sdk.Serialisation.V2; +using Speckle.Sdk.Serialisation.V2.Send; using Speckle.Sdk.Transports; namespace Speckle.Sdk.Serialization.Tests; @@ -11,7 +12,7 @@ namespace Speckle.Sdk.Serialization.Tests; public class DummySendServerObjectManager(ConcurrentDictionary savedObjects) : IServerObjectManager { public IAsyncEnumerable<(string, string)> DownloadObjects( - IReadOnlyList objectIds, + IReadOnlyCollection objectIds, IProgress? progress, CancellationToken cancellationToken ) => throw new NotImplementedException(); @@ -22,7 +23,10 @@ public class DummySendServerObjectManager(ConcurrentDictionary s CancellationToken cancellationToken ) => throw new NotImplementedException(); - public Task> HasObjects(IReadOnlyList objectIds, CancellationToken cancellationToken) + public Task> HasObjects( + IReadOnlyCollection objectIds, + CancellationToken cancellationToken + ) { return Task.FromResult(objectIds.Distinct().ToDictionary(x => x, savedObjects.ContainsKey)); } @@ -36,7 +40,7 @@ public class DummySendServerObjectManager(ConcurrentDictionary s { foreach (var obj in objects) { - savedObjects.TryAdd(obj.Id, obj.Json); + savedObjects.TryAdd(obj.Id.Value, obj.Json.Value); } return Task.CompletedTask; } diff --git a/tests/Speckle.Sdk.Serialization.Tests/ExternalIdTests.cs b/tests/Speckle.Sdk.Serialization.Tests/ExternalIdTests.cs index 2c0c5014..925d73f8 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/ExternalIdTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/ExternalIdTests.cs @@ -26,7 +26,10 @@ public class ExternalIdTests public void ExternalIdTest_Detached(string lineId, string valueId) { var p = new Polyline() { units = "cm", value = [1, 2] }; - var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary(), true); + using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create( + new Dictionary(), + default + ); var list = serializer.Serialize(p).ToDictionary(x => x.Item1, x => x.Item2); list.ContainsKey(new Id(lineId)).ShouldBeTrue(); var json = list[new Id(lineId)]; @@ -53,7 +56,10 @@ public class ExternalIdTests knots = [], weights = [], }; - var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary(), true); + using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create( + new Dictionary(), + default + ); var list = serializer.Serialize(curve).ToDictionary(x => x.Item1, x => x.Item2); list.ContainsKey(new Id(lineId)).ShouldBeTrue(); var json = list[new Id(lineId)]; @@ -81,7 +87,10 @@ public class ExternalIdTests weights = [], }; var polycurve = new Polycurve() { segments = [curve], units = "cm" }; - var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary(), true); + using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create( + new Dictionary(), + default + ); var list = serializer.Serialize(polycurve).ToDictionary(x => x.Item1, x => x.Item2); list.ContainsKey(new Id(lineId)).ShouldBeTrue(); var json = list[new Id(lineId)]; @@ -111,7 +120,10 @@ public class ExternalIdTests var polycurve = new Polycurve() { segments = [curve], units = "cm" }; var @base = new Base(); @base.SetDetachedProp("profile", polycurve); - var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary(), true); + using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create( + new Dictionary(), + default + ); var list = serializer.Serialize(@base).ToDictionary(x => x.Item1, x => x.Item2); list.ContainsKey(new Id(lineId)).ShouldBeTrue(); var json = list[new Id(lineId)];