Closures are kept for children instead of global (#189)

* disable channels when skipping things

* pass child closures to current.  Current closures out to parent.

* fix build

* adjust options

* use a dictionary pool and pool correctly

* add pools for data chunks

* format
This commit is contained in:
Adam Hathcock
2024-12-13 11:00:21 +00:00
committed by GitHub
parent 722df50d34
commit defcee165a
15 changed files with 306 additions and 146 deletions
+27 -1
View File
@@ -21,5 +21,31 @@ public static class Pools
public static Pool<StringBuilder> StringBuilders { get; } =
new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 });
public static Pool<List<T>> CreateListPool<T>() => new(new DefaultPooledObjectPolicy<List<T>>());
private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);
public bool Return(Dictionary<TKey, TValue> obj)
{
obj.Clear();
return true;
}
}
private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);
public bool Return(List<T> obj)
{
obj.Clear();
return true;
}
}
public static Pool<List<T>> CreateListPool<T>() => new(new ObjectListPolicy<T>());
public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());
}
@@ -1,18 +1,18 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;
namespace Speckle.Sdk.Serialisation.V2.Send;
public static class ChannelExtensions
{
public static BatchingChannelReader<BaseItem, List<BaseItem>> BatchBySize(
this ChannelReader<BaseItem> source,
public static BatchingChannelReader<T, List<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
bool allowSynchronousContinuations = false
) =>
new SizeBatchingChannelReader(
)
where T : IHasSize =>
new SizeBatchingChannelReader<T>(
source ?? throw new ArgumentNullException(nameof(source)),
batchSize,
singleReader,
@@ -2,7 +2,7 @@
namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelLoader
public abstract class ChannelLoader<T>
{
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
@@ -27,7 +27,7 @@ public abstract class ChannelLoader
public abstract string? CheckCache(string id);
public abstract Task<List<BaseItem>> Download(List<string?> ids);
public abstract Task<List<T>> Download(List<string?> ids);
public abstract void SaveToCache(List<BaseItem> x);
public abstract void SaveToCache(List<T> x);
}
@@ -1,27 +1,11 @@
using System.Text;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;
namespace Speckle.Sdk.Dependencies.Serialization;
public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage)
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json);
public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id, other.Value.Id, StringComparison.OrdinalIgnoreCase);
}
public override int GetHashCode() => Id.GetHashCode();
}
public abstract class ChannelSaver
public abstract class ChannelSaver<T>
where T : IHasSize
{
private const int SEND_CAPACITY = 50;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
@@ -31,7 +15,9 @@ public abstract class ChannelSaver
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 200;
private readonly Channel<BaseItem> _checkCacheChannel = Channel.CreateBounded<BaseItem>(
private bool _enabled;
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
AllowSynchronousContinuations = true,
@@ -43,35 +29,60 @@ public abstract class ChannelSaver
_ => throw new NotImplementedException("Dropping items not supported.")
);
public Task Start(CancellationToken cancellationToken = default)
public Task<long> Start(
bool enableServerSending = true,
bool enableCacheSaving = true,
CancellationToken cancellationToken = default
)
{
var t = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);
return t;
ValueTask<long> t = new(Task.FromResult(0L));
if (enableServerSending)
{
_enabled = true;
var tChannelReader = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
);
if (enableCacheSaving)
{
t = new(
tChannelReader
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
);
}
else
{
t = tChannelReader.ReadUntilCancelledAsync(cancellationToken, (list, l) => new ValueTask());
}
}
return t.AsTask();
}
public async Task Save(BaseItem item, CancellationToken cancellationToken = default) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
public async ValueTask Save(T item, CancellationToken cancellationToken = default)
{
if (_enabled)
{
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
public abstract Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken);
public abstract Task<List<T>> SendToServer(List<T> batch, CancellationToken cancellationToken);
public Task Done()
public ValueTask Done()
{
_checkCacheChannel.Writer.Complete();
return Task.CompletedTask;
return new(Task.CompletedTask);
}
public abstract void SaveToCache(List<BaseItem> item);
public abstract void SaveToCache(List<T> item);
}
@@ -1,28 +1,33 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;
namespace Speckle.Sdk.Serialisation.V2.Send;
public class SizeBatchingChannelReader(
ChannelReader<BaseItem> source,
public interface IHasSize
{
int Size { get; }
}
public class SizeBatchingChannelReader<T>(
ChannelReader<T> source,
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<BaseItem, List<BaseItem>>(source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
where T : IHasSize
{
private readonly int _batchSize = batchSize;
protected override List<BaseItem> CreateBatch(int capacity) => new();
protected override List<T> CreateBatch(int capacity) => new();
protected override void TrimBatch(List<BaseItem> batch) => batch.TrimExcess();
protected override void TrimBatch(List<T> batch) => batch.TrimExcess();
protected override void AddBatchItem(List<BaseItem> batch, BaseItem item) => batch.Add(item);
protected override void AddBatchItem(List<T> batch, T item) => batch.Add(item);
protected override int GetBatchSize(List<BaseItem> batch)
protected override int GetBatchSize(List<T> batch)
{
int size = 0;
foreach (BaseItem item in batch)
foreach (T item in batch)
{
size += item.Size;
}
@@ -1,5 +1,5 @@
using System.Text;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
@@ -23,7 +23,7 @@ public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
public class DummySendServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -35,7 +35,7 @@ public class DummySendServerObjectManager : IServerObjectManager
) => throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -49,7 +49,7 @@ public class DummySendServerObjectManager : IServerObjectManager
long totalBytes = 0;
foreach (var item in objects)
{
totalBytes += Encoding.Default.GetByteCount(item.Json);
totalBytes += Encoding.Default.GetByteCount(item.Json.Value);
}
progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes));
@@ -3,6 +3,7 @@ using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
@@ -13,7 +14,7 @@ public sealed class ObjectLoader(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IProgress<ProgressArgs>? progress
) : ChannelLoader, IObjectLoader
) : ChannelLoader<BaseItem>, IObjectLoader
{
private int? _allChildrenCount;
private long _checkCache;
@@ -80,13 +81,13 @@ public sealed class ObjectLoader(
var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default)
)
{
toCache.Add(new(id, json, true));
toCache.Add(new(new(id), new(json), true, null));
}
if (toCache.Count != ids.Count)
{
throw new SpeckleException(
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id)).Take(10))}"
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id.Value)).Take(10))}"
);
}
return toCache;
@@ -97,7 +98,7 @@ public sealed class ObjectLoader(
{
if (!_options.SkipCache)
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json)));
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount));
}
@@ -9,19 +9,25 @@ using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.Utilities;
using Closures = System.Collections.Generic.IReadOnlyDictionary<Speckle.Sdk.Serialisation.Id, int>;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
namespace Speckle.Sdk.Serialisation.V2.Send;
public readonly record struct CacheInfo(Json Json, Closures Closures);
public readonly record struct NodeInfo(Json Json, Closures? C)
{
public Closures GetClosures() =>
C ?? ClosureParser.GetClosures(Json.Value).ToDictionary(x => new Id(x.Item1), x => x.Item2);
}
public partial interface IObjectSerializer : IDisposable;
[GenerateAutoInterface]
public class ObjectSerializer : IObjectSerializer
public sealed class ObjectSerializer : IObjectSerializer
{
private HashSet<object> _parentObjects = new();
private readonly Dictionary<Id, int> _currentClosures = new();
private readonly IDictionary<Base, CacheInfo> _baseCache;
private readonly IReadOnlyDictionary<Id, NodeInfo> _childCache;
private readonly bool _trackDetachedChildren;
private readonly IBasePropertyGatherer _propertyGatherer;
@@ -33,7 +39,14 @@ public class ObjectSerializer : IObjectSerializer
/// </summary>
public Dictionary<Id, ObjectReference> ObjectReferences { get; } = new();
private readonly List<(Id, Json)> _chunks = new();
private readonly List<(Id, Json, Closures)> _chunks;
private readonly Pool<List<(Id, Json, Closures)>> _chunksPool;
private readonly List<List<DataChunk>> _chunks2 = new();
private readonly Pool<List<DataChunk>> _chunks2Pool;
private readonly List<List<object?>> _chunks3 = new();
private readonly Pool<List<object?>> _chunks3Pool;
/// <summary>
/// Creates a new Serializer instance.
@@ -42,22 +55,43 @@ public class ObjectSerializer : IObjectSerializer
/// <param name="cancellationToken"></param>
public ObjectSerializer(
IBasePropertyGatherer propertyGatherer,
IDictionary<Base, CacheInfo> baseCache,
IReadOnlyDictionary<Id, NodeInfo> childCache,
Pool<List<(Id, Json, Closures)>> chunksPool,
Pool<List<DataChunk>> chunks2Pool,
Pool<List<object?>> chunks3Pool,
bool trackDetachedChildren = false,
CancellationToken cancellationToken = default
)
{
_propertyGatherer = propertyGatherer;
_baseCache = baseCache;
_childCache = childCache;
_chunksPool = chunksPool;
_chunks2Pool = chunks2Pool;
_chunks3Pool = chunks3Pool;
_cancellationToken = cancellationToken;
_trackDetachedChildren = trackDetachedChildren;
_chunks = chunksPool.Get();
}
[AutoInterfaceIgnore]
public void Dispose()
{
_chunksPool.Return(_chunks);
foreach (var c2 in _chunks2)
{
_chunks2Pool.Return(c2);
}
foreach (var c3 in _chunks3)
{
_chunks3Pool.Return(c3);
}
}
/// <param name="baseObj">The object to serialize</param>
/// <returns>The serialized JSON</returns>
/// <exception cref="InvalidOperationException">The serializer is busy (already serializing an object)</exception>
/// <exception cref="SpeckleSerializeException">Failed to extract (pre-serialize) properties from the <paramref name="baseObj"/></exception>
public IEnumerable<(Id, Json)> Serialize(Base baseObj)
public IEnumerable<(Id, Json, Closures)> Serialize(Base baseObj)
{
try
{
@@ -70,8 +104,7 @@ public class ObjectSerializer : IObjectSerializer
{
throw new SpeckleSerializeException($"Failed to extract (pre-serialize) properties from the {baseObj}", ex);
}
_baseCache[baseObj] = new(item.Item2, _currentClosures);
yield return (item.Item1, item.Item2);
yield return (item.Item1, item.Item2, _currentClosures);
foreach (var chunk in _chunks)
{
yield return chunk;
@@ -232,27 +265,6 @@ public class ObjectSerializer : IObjectSerializer
{
return null;
}
Closures childClosures;
Id id;
Json json;
//avoid multiple serialization to get closures
if (_baseCache.TryGetValue(baseObj, out var info))
{
id = new Id(baseObj.id.NotNull());
childClosures = info.Closures;
json = info.Json;
MergeClosures(_currentClosures, childClosures);
}
else
{
childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : [];
var sb = Pools.StringBuilders.Get();
using var writer = new StringWriter(sb);
using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer);
id = SerializeBaseObject(baseObj, jsonWriter, childClosures);
json = new Json(writer.ToString());
Pools.StringBuilders.Return(sb);
}
_parentObjects.Remove(baseObj);
@@ -266,6 +278,27 @@ public class ObjectSerializer : IObjectSerializer
if (inheritedDetachInfo.IsDetachable)
{
Closures childClosures;
Id id;
Json json;
//avoid multiple serialization to get closures
if (baseObj.id != null && _childCache.TryGetValue(new(baseObj.id), out var info))
{
id = new Id(baseObj.id);
childClosures = info.GetClosures();
json = info.Json;
MergeClosures(_currentClosures, childClosures);
}
else
{
childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : [];
var sb = Pools.StringBuilders.Get();
using var writer = new StringWriter(sb);
using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer);
id = SerializeBaseObject(baseObj, jsonWriter, childClosures);
json = new Json(writer.ToString());
Pools.StringBuilders.Return(sb);
}
var json2 = ReferenceGenerator.CreateReference(id);
AddClosure(id);
// add to obj refs to return
@@ -278,10 +311,20 @@ public class ObjectSerializer : IObjectSerializer
closure = childClosures.ToDictionary(x => x.Key.Value, x => x.Value),
};
}
_chunks.Add(new(id, json));
_chunks.Add(new(id, json, []));
return new(id, json2);
}
return new(id, json);
else
{
var childClosures = isRoot || inheritedDetachInfo.IsDetachable ? _currentClosures : [];
var sb = Pools.StringBuilders.Get();
using var writer = new StringWriter(sb);
using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer);
var id = SerializeBaseObject(baseObj, jsonWriter, childClosures);
var json = new Json(writer.ToString());
Pools.StringBuilders.Return(sb);
return new(id, json);
}
}
private Id SerializeBaseObject(Base baseObj, JsonWriter writer, Closures closure)
@@ -334,12 +377,21 @@ public class ObjectSerializer : IObjectSerializer
return id;
}
private List<object?> GetChunk()
{
var chunk = _chunks3Pool.Get();
_chunks3.Add(chunk);
return chunk;
}
private void SerializeOrChunkProperty(object? baseValue, JsonWriter jsonWriter, PropertyAttributeInfo detachInfo)
{
if (baseValue is IEnumerable chunkableCollection && detachInfo.IsChunkable)
{
List<DataChunk> chunks = new();
DataChunk crtChunk = new() { data = new List<object?>(detachInfo.ChunkSize) };
List<DataChunk> chunks = _chunks2Pool.Get();
_chunks2.Add(chunks);
DataChunk crtChunk = new() { data = GetChunk() };
foreach (object element in chunkableCollection)
{
@@ -347,7 +399,7 @@ public class ObjectSerializer : IObjectSerializer
if (crtChunk.data.Count >= detachInfo.ChunkSize)
{
chunks.Add(crtChunk);
crtChunk = new DataChunk { data = new List<object?>(detachInfo.ChunkSize) };
crtChunk = new DataChunk { data = GetChunk() };
}
}
@@ -1,11 +1,17 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Models;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
namespace Speckle.Sdk.Serialisation.V2.Send;
[GenerateAutoInterface]
public class ObjectSerializerFactory(IBasePropertyGatherer propertyGatherer) : IObjectSerializerFactory
{
public IObjectSerializer Create(IDictionary<Base, CacheInfo> baseCache, CancellationToken cancellationToken) =>
new ObjectSerializer(propertyGatherer, baseCache, true, cancellationToken);
private readonly Pool<List<(Id, Json, Closures)>> _chunkPool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly Pool<List<DataChunk>> _chunk2Pool = Pools.CreateListPool<DataChunk>();
private readonly Pool<List<object?>> _chunk3Pool = Pools.CreateListPool<object?>();
public IObjectSerializer Create(IReadOnlyDictionary<Id, NodeInfo> baseCache, CancellationToken cancellationToken) =>
new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, true, cancellationToken);
}
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Text;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
@@ -6,14 +7,17 @@ using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Models;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
namespace Speckle.Sdk.Serialisation.V2.Send;
public record SerializeProcessOptions(
bool SkipCacheRead,
bool SkipCacheWrite,
bool SkipServer,
bool SkipFindTotalObjects
bool SkipCacheRead = false,
bool SkipCacheWrite = false,
bool SkipServer = false,
bool SkipFindTotalObjects = false,
bool EnableServerSending = true,
bool EnableCacheSaving = true
);
public readonly record struct SerializeProcessResults(
@@ -21,6 +25,22 @@ public readonly record struct SerializeProcessResults(
IReadOnlyDictionary<Id, ObjectReference> ConvertedReferences
);
public readonly record struct BaseItem(Id Id, Json Json, bool NeedsStorage, Closures? Closures) : IHasSize
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id.Value, other.Value.Id.Value, StringComparison.OrdinalIgnoreCase);
}
public override int GetHashCode() => Id.GetHashCode();
}
[GenerateAutoInterface]
public class SerializeProcess(
IProgress<ProgressArgs>? progress,
@@ -29,14 +49,13 @@ public class SerializeProcess(
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
SerializeProcessOptions? options = null
) : ChannelSaver, ISerializeProcess
) : ChannelSaver<BaseItem>, ISerializeProcess
{
private readonly SerializeProcessOptions _options = options ?? new(false, false, false, false);
//cache bases and closure info to avoid reserialization
private readonly IDictionary<Base, CacheInfo> _baseCache = new ConcurrentDictionary<Base, CacheInfo>();
private readonly ConcurrentDictionary<Id, ObjectReference> _objectReferences = new();
private readonly Pool<List<(Id, Json)>> _pool = Pools.CreateListPool<(Id, Json)>();
private readonly Pool<List<(Id, Json, Closures)>> _pool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly Pool<Dictionary<Id, NodeInfo>> _childClosurePool = Pools.CreateDictionaryPool<Id, NodeInfo>();
private long _objectCount;
private long _objectsFound;
@@ -48,7 +67,7 @@ public class SerializeProcess(
public async Task<SerializeProcessResults> Serialize(Base root, CancellationToken cancellationToken)
{
var channelTask = Start(cancellationToken);
var channelTask = Start(_options.EnableServerSending, _options.EnableCacheSaving, cancellationToken);
var findTotalObjectsTask = Task.CompletedTask;
if (!_options.SkipFindTotalObjects)
{
@@ -76,9 +95,9 @@ public class SerializeProcess(
}
}
private async Task Traverse(Base obj, bool isEnd, CancellationToken cancellationToken)
private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, bool isEnd, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var tasks = new List<Task<Dictionary<Id, NodeInfo>>>();
foreach (var child in baseChildFinder.GetChildren(obj))
{
// tmp is necessary because of the way closures close over loop variables
@@ -98,8 +117,19 @@ public class SerializeProcess(
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
var childClosures = _childClosurePool.Get();
foreach (var t in tasks)
{
var childClosure = t.Result;
foreach (var kvp in childClosure)
{
childClosures[kvp.Key] = kvp.Value;
}
}
var items = Serialise(obj, cancellationToken);
var items = Serialise(obj, childClosures, cancellationToken);
var currentClosures = new Dictionary<Id, NodeInfo>();
Interlocked.Increment(ref _objectCount);
progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _objectCount, _objectsFound));
foreach (var item in items)
@@ -108,28 +138,40 @@ public class SerializeProcess(
{
await Save(item, cancellationToken).ConfigureAwait(false);
}
if (!currentClosures.ContainsKey(item.Id))
{
currentClosures.Add(item.Id, new NodeInfo(item.Json, item.Closures));
}
}
_childClosurePool.Return(childClosures);
if (isEnd)
{
await Done().ConfigureAwait(false);
}
return currentClosures;
}
//leave this sync
private IEnumerable<BaseItem> Serialise(Base obj, CancellationToken cancellationToken)
private IEnumerable<BaseItem> Serialise(
Base obj,
IReadOnlyDictionary<Id, NodeInfo> childInfo,
CancellationToken cancellationToken
)
{
if (!_options.SkipCacheRead && obj.id != null)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(obj.id);
if (cachedJson != null)
{
yield return new BaseItem(obj.id.NotNull(), cachedJson, false);
yield return new BaseItem(new(obj.id.NotNull()), new(cachedJson), false, null);
yield break;
}
}
var serializer2 = objectSerializerFactory.Create(_baseCache, cancellationToken);
using var serializer2 = objectSerializerFactory.Create(childInfo, cancellationToken);
var items = _pool.Get();
try
{
@@ -140,11 +182,11 @@ public class SerializeProcess(
_objectReferences.TryAdd(kvp.Key, kvp.Value);
}
var (id, json) = items.First();
yield return CheckCache(id, json);
foreach (var (cid, cJson) in items.Skip(1))
var (id, json, closures) = items.First();
yield return CheckCache(id, json, closures);
foreach (var (cid, cJson, cClosures) in items.Skip(1))
{
yield return CheckCache(cid, cJson);
yield return CheckCache(cid, cJson, cClosures);
}
}
finally
@@ -153,17 +195,17 @@ public class SerializeProcess(
}
}
private BaseItem CheckCache(Id id, Json json)
private BaseItem CheckCache(Id id, Json json, Dictionary<Id, int> closures)
{
if (!_options.SkipCacheRead)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(id.Value);
if (cachedJson != null)
{
return new BaseItem(id.Value, cachedJson, false);
return new BaseItem(id, new(cachedJson), false, null);
}
}
return new BaseItem(id.Value, json.Value, true);
return new BaseItem(id, json, true, closures);
}
public override async Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken)
@@ -172,9 +214,9 @@ public class SerializeProcess(
{
var objectBatch = batch.Distinct().ToList();
var hasObjects = await serverObjectManager
.HasObjects(objectBatch.Select(x => x.Id).ToList(), cancellationToken)
.HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), cancellationToken)
.ConfigureAwait(false);
objectBatch = batch.Where(x => !hasObjects[x.Id]).ToList();
objectBatch = batch.Where(x => !hasObjects[x.Id.Value]).ToList();
if (objectBatch.Count != 0)
{
await serverObjectManager.UploadObjects(objectBatch, true, progress, cancellationToken).ConfigureAwait(false);
@@ -190,7 +232,7 @@ public class SerializeProcess(
{
if (!_options.SkipCacheWrite && batch.Count != 0)
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json)));
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _objectsSerialized));
}
@@ -5,9 +5,9 @@ using System.Text;
using Speckle.InterfaceGenerator;
using Speckle.Newtonsoft.Json;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;
using Speckle.Sdk.Transports.ServerUtils;
@@ -42,7 +42,7 @@ public class ServerObjectManager : IServerObjectManager
}
public async IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
[EnumeratorCancellation] CancellationToken cancellationToken
)
@@ -139,7 +139,7 @@ public class ServerObjectManager : IServerObjectManager
}
public async Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
)
{
@@ -336,7 +336,7 @@ public class SamplePropBase2 : Base
public class DummyServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -348,7 +348,7 @@ public class DummyServerObjectManager : IServerObjectManager
) => throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -362,7 +362,7 @@ public class DummyServerObjectManager : IServerObjectManager
long totalBytes = 0;
foreach (var item in objects)
{
totalBytes += Encoding.Default.GetByteCount(item.Json);
totalBytes += Encoding.Default.GetByteCount(item.Json.Value);
}
progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes));
@@ -2,6 +2,7 @@
using System.Text;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests;
@@ -9,7 +10,7 @@ namespace Speckle.Sdk.Serialization.Tests;
public class DummyReceiveServerObjectManager(Dictionary<string, string> objects) : IServerObjectManager
{
public async IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
[EnumeratorCancellation] CancellationToken cancellationToken
)
@@ -32,7 +33,7 @@ public class DummyReceiveServerObjectManager(Dictionary<string, string> objects)
}
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -46,7 +47,7 @@ public class DummyReceiveServerObjectManager(Dictionary<string, string> objects)
long totalBytes = 0;
foreach (var item in objects)
{
totalBytes += Encoding.Default.GetByteCount(item.Json);
totalBytes += Encoding.Default.GetByteCount(item.Json.Value);
}
progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes));
@@ -4,6 +4,7 @@ using Speckle.Newtonsoft.Json.Linq;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialization.Tests;
@@ -11,7 +12,7 @@ namespace Speckle.Sdk.Serialization.Tests;
public class DummySendServerObjectManager(ConcurrentDictionary<string, string> savedObjects) : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
@@ -22,7 +23,10 @@ public class DummySendServerObjectManager(ConcurrentDictionary<string, string> s
CancellationToken cancellationToken
) => throw new NotImplementedException();
public Task<Dictionary<string, bool>> HasObjects(IReadOnlyList<string> objectIds, CancellationToken cancellationToken)
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
)
{
return Task.FromResult(objectIds.Distinct().ToDictionary(x => x, savedObjects.ContainsKey));
}
@@ -36,7 +40,7 @@ public class DummySendServerObjectManager(ConcurrentDictionary<string, string> s
{
foreach (var obj in objects)
{
savedObjects.TryAdd(obj.Id, obj.Json);
savedObjects.TryAdd(obj.Id.Value, obj.Json.Value);
}
return Task.CompletedTask;
}
@@ -26,7 +26,10 @@ public class ExternalIdTests
public void ExternalIdTest_Detached(string lineId, string valueId)
{
var p = new Polyline() { units = "cm", value = [1, 2] };
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Base, CacheInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(p).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
@@ -53,7 +56,10 @@ public class ExternalIdTests
knots = [],
weights = [],
};
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Base, CacheInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(curve).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
@@ -81,7 +87,10 @@ public class ExternalIdTests
weights = [],
};
var polycurve = new Polycurve() { segments = [curve], units = "cm" };
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Base, CacheInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(polycurve).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
@@ -111,7 +120,10 @@ public class ExternalIdTests
var polycurve = new Polycurve() { segments = [curve], units = "cm" };
var @base = new Base();
@base.SetDetachedProp("profile", polycurve);
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Base, CacheInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(@base).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];