Batch by size, Closures and Detached items are computed differently (#164)

* Can debug dependencies

* Different exceptions

* Uses root id only after we found it to signal the end

* DataChunks are created later and need to be accounted for

* format

* use app ids in tests and references

* check sqlite cache after serialize

* use dummy to go through channels to end

* fmt

* Extend channel lib to batch by size

* fmt

* build fix

* adjust limits

* FIx sending

* Optimize reference generation

* more

* remove tolist

* rework closures to be constant and serializer only deals with current....references bases are cached

* fix chunk creation

* another bug fix

* clean up with factories

* add deserializer factory

* Needed to reference interface

* move around streamId

* some clean up

* Use StringBuilder pool on serialization to reduce memory pressure

* remove extra

* remove extra clears

* Fix a flaw in batchsize

* use default complete

* format

* loader should use 1 writer that is batched

* remove redundant ref gen

* Fix graphql commands by adding project id
This commit is contained in:
Adam Hathcock
2024-11-12 14:25:49 +00:00
committed by GitHub
parent 43445bc4ff
commit 715bb7274a
33 changed files with 397 additions and 328 deletions
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Models;
@@ -6,17 +7,19 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2.Send;
public record SerializeProcessOptions(bool SkipCache, bool SkipServer);
public record SerializeProcessOptions(bool SkipCacheRead, bool SkipCacheWrite, bool SkipServer);
[GenerateAutoInterface]
public class SerializeProcess(
IProgress<ProgressArgs>? progress,
ISQLiteSendCacheManager sqliteSendCacheManager,
IServerObjectManager serverObjectManager,
ISpeckleBaseChildFinder speckleBaseChildFinder,
ISpeckleBasePropertyGatherer speckleBasePropertyGatherer
) : ChannelSaver
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory
) : ChannelSaver, ISerializeProcess
{
private readonly ConcurrentDictionary<string, string> _jsonCache = new();
private readonly ConcurrentDictionary<Base, (string, Dictionary<string, int>)> _baseCache = new();
private readonly ConcurrentDictionary<string, ObjectReference> _objectReferences = new();
private long _totalFound;
@@ -25,26 +28,25 @@ public class SerializeProcess(
private long _cached;
private long _serialized;
private SerializeProcessOptions _options = new(false, false);
private SerializeProcessOptions _options = new(false, false, false);
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Serialize(
string streamId,
Base root,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
)
{
_options = options ?? _options;
var channelTask = Start(streamId, cancellationToken);
var channelTask = Start(cancellationToken);
await Traverse(root, true, cancellationToken).ConfigureAwait(false);
await channelTask.ConfigureAwait(false);
return (root.id, _objectReferences);
}
private async Task<List<Dictionary<string, int>>> Traverse(Base obj, bool isEnd, CancellationToken cancellationToken)
private async Task Traverse(Base obj, bool isEnd, CancellationToken cancellationToken)
{
var tasks = new List<Task<List<Dictionary<string, int>>>>();
foreach (var child in speckleBaseChildFinder.GetChildren(obj))
var tasks = new List<Task>();
foreach (var child in baseChildFinder.GetChildren(obj))
{
Interlocked.Increment(ref _totalFound);
progress?.Report(new(ProgressEvent.FindingChildren, _totalFound, null));
@@ -65,19 +67,8 @@ public class SerializeProcess(
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
var closures = tasks
.Select(t => t.Result)
.Aggregate(
new List<Dictionary<string, int>>(),
(a, s) =>
{
a.AddRange(s);
return a;
}
)
.ToList();
var items = Serialise(obj, closures);
var items = Serialise(obj, cancellationToken);
foreach (var item in items)
{
Interlocked.Increment(ref _serialized);
@@ -93,18 +84,17 @@ public class SerializeProcess(
{
await Done().ConfigureAwait(false);
}
return closures;
}
//leave this sync
private IEnumerable<BaseItem> Serialise(Base obj, List<Dictionary<string, int>> childClosures)
private IEnumerable<BaseItem> Serialise(Base obj, CancellationToken cancellationToken)
{
if (obj.id != null && _jsonCache.ContainsKey(obj.id))
{
yield break;
}
if (!_options.SkipCache && obj.id != null)
if (!_options.SkipCacheRead && obj.id != null)
{
var cachedJson = sqliteSendCacheManager.GetObject(obj.id);
if (cachedJson != null)
@@ -116,7 +106,7 @@ public class SerializeProcess(
var id = obj.id;
if (id is null || !_jsonCache.TryGetValue(id, out var json))
{
SpeckleObjectSerializer2 serializer2 = new(speckleBasePropertyGatherer, childClosures, true);
var serializer2 = objectSerializerFactory.Create(_baseCache, cancellationToken);
var items = serializer2.Serialize(obj).ToList();
foreach (var kvp in serializer2.ObjectReferences)
{
@@ -149,7 +139,7 @@ public class SerializeProcess(
private BaseItem CheckCache(string id, string json)
{
if (!_options.SkipCache)
if (!_options.SkipCacheRead)
{
var cachedJson = sqliteSendCacheManager.GetObject(id);
if (cachedJson != null)
@@ -160,38 +150,23 @@ public class SerializeProcess(
return new BaseItem(id, json, true);
}
public override async Task<List<BaseItem>> SendToServer(
string streamId,
List<BaseItem> batch,
CancellationToken cancellationToken
)
public override async Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken)
{
if (batch.Count == 0)
if (!_options.SkipServer && batch.Count != 0)
{
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, _totalToUpload));
return batch;
}
if (!_options.SkipServer)
{
await serverObjectManager.UploadObjects(streamId, batch, true, progress, cancellationToken).ConfigureAwait(false);
await serverObjectManager.UploadObjects(batch, true, progress, cancellationToken).ConfigureAwait(false);
Interlocked.Exchange(ref _uploaded, _uploaded + batch.Count);
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, _totalToUpload));
}
return batch;
}
public override void SaveToCache(List<BaseItem> items)
public override void SaveToCache(List<BaseItem> batch)
{
if (!_options.SkipCache)
if (!_options.SkipCacheWrite && batch.Count != 0)
{
if (items.Count == 0)
{
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, null));
return;
}
sqliteSendCacheManager.SaveObjects(items);
Interlocked.Exchange(ref _cached, _cached + items.Count);
sqliteSendCacheManager.SaveObjects(batch);
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, null));
}
}