Files
speckle-sharp-sdk/src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs
T
Adam Hathcock 4cc78c4bc9 Serialize using a Channel (#146)
* Use a stack channel for deserialization

* multi-threaded

* add object dictionary pool

* more pooling

* adjust sqlite transport

* format

* Optimize IsPropNameValid

* object loader first pass

* save test

* add cache pre check

* save better deserialize

* mostly works

* uses tasks but slower at end

* rework to make more sense

* add check to avoid multi-deserialize

* modify max parallelism

* async enqueuing of tasks

* switch to more asyncenumerable

* fmt

* fmt

* cleanup sqlite

* make ServerObjectManager

* revert change

* add ability to skip cache check

* cache json to know what is loaded

* testing

* clean up usage

* clean up and added new op

* Fix exception handling

* fixing progress

* remove codejam

* Hides ObjectPool dependency

* fmt

* Use the 1.0 BCL async to try to be more compatible

* rename to dependencies

* Move Polly to internal dependencies

* format

* remove more old references

* remove stackchannel

* fixes for registration

* remove console writeline

* add cache check shortcut for root object

* start refactoring send

* recevie2 benchmark

* add test for deserialize new

* use channels for sending

* test and fixes

* Use same asyncinterfaces as Dynamo.  Merge fixes

* clean up

* fix download object progress

* put back from bad merge

* intermediate commit: separating get child function from serializer

* send didn't error

* add channels

* Use net48, netstandard2.1 and net8

* remove collection special case

* have to make a tree of tasks even though it may serialize things twice

* pre-id changing during serialize

* need AsyncInterfaces for net48 :(

* options changes

* revert to netstandard2.0 and net8.0

* fix totals

* revert httpcontext changes

* format

* clean up

* active tasks works when accounting for id not being stable

* add id tests

* more fixes

* works

* format

* Convert to BaseItem and use single SQLite checks to avoid locks

* use locks and batch sqlite operations

* hook up and handle null ids

* remove unused parameter

* remove progress from serializer itself

* invert has objects call

* readd object references

* format

* fix tests

* remove active tasks check

* bug fix for json cache

* remove locks from sqlite

* General Send test

* add childclosures

* redo extract all to be enumerable

* group tests in projects

* caching json does matter

* cache checking should be managed by channels

* format

* Merge pull request #152 from specklesystems/new-json-test

Uses a new objects test in Revit for serialization tests

* add skip

* add new roundtrip test

* fix finish

* clean up tests

* check happens in serialize...don't do it twice

* better progress reporting

* fix progress reporting

* only use detached properties when children gathering

* move detached tests

* add detached tests

* fix merge

* Fix progress change

* fix more tests

---------

Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com>
Co-authored-by: Claire Kuang <kuang.claire@gmail.com>
2024-11-05 09:56:54 +00:00

157 lines
4.4 KiB
C#

using System.Collections.Concurrent;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Models;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2.Send;
public record SerializeProcessOptions(bool SkipCache, bool SkipServer);
public class SerializeProcess(
IProgress<ProgressArgs>? progress,
ISQLiteSendCacheManager sqliteSendCacheManager,
IServerObjectManager serverObjectManager,
ISpeckleBaseChildFinder speckleBaseChildFinder,
ISpeckleBasePropertyGatherer speckleBasePropertyGatherer
) : ChannelSaver
{
private readonly ConcurrentDictionary<string, string> _jsonCache = new();
private readonly ConcurrentDictionary<string, ObjectReference> _objectReferences = new();
private long _total;
private long _cached;
private long _serialized;
private SerializeProcessOptions _options = new(false, false);
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Serialize(
string streamId,
Base root,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
)
{
_options = options ?? _options;
var channelTask = Start(streamId, cancellationToken);
await Traverse(root, true, cancellationToken).ConfigureAwait(false);
await channelTask.ConfigureAwait(false);
return (root.id, _objectReferences);
}
private async Task<List<Dictionary<string, int>>> Traverse(Base obj, bool isEnd, CancellationToken cancellationToken)
{
var tasks = new List<Task<List<Dictionary<string, int>>>>();
foreach (var child in speckleBaseChildFinder.GetChildren(obj))
{
Interlocked.Increment(ref _total);
// tmp is necessary because of the way closures close over loop variables
var tmp = child;
var t = Task
.Factory.StartNew(
() => Traverse(tmp, false, cancellationToken),
cancellationToken,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default
)
.Unwrap();
tasks.Add(t);
}
if (tasks.Count > 0)
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
var closures = tasks
.Select(t => t.Result)
.Aggregate(
new List<Dictionary<string, int>>(),
(a, s) =>
{
a.AddRange(s);
return a;
}
)
.ToList();
var item = Serialise(obj, closures);
Interlocked.Increment(ref _serialized);
progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _serialized, _total));
if (item?.NeedsStorage ?? false)
{
await Save(item.Value, cancellationToken).ConfigureAwait(false);
}
if (isEnd)
{
Done();
}
return closures;
}
//leave this sync
private BaseItem? Serialise(Base obj, List<Dictionary<string, int>> childClosures)
{
if (obj.id != null && _jsonCache.ContainsKey(obj.id))
{
return null;
}
string? json = null;
if (!_options.SkipCache && obj.id != null)
{
json = sqliteSendCacheManager.GetObject(obj.id);
}
if (json == null)
{
var id = obj.id;
if (id is null || !_jsonCache.TryGetValue(id, out json))
{
SpeckleObjectSerializer2 serializer2 = new(speckleBasePropertyGatherer, childClosures);
json = serializer2.Serialize(obj);
obj.id.NotNull();
foreach (var kvp in serializer2.ObjectReferences)
{
_objectReferences.TryAdd(kvp.Key, kvp.Value);
}
_jsonCache.TryAdd(obj.id, json);
if (id is not null && id != obj.id)
{
//in case the ids changes which is due to id hash algorithm changing
_jsonCache.TryAdd(id, json);
}
}
return new BaseItem(obj.id.NotNull(), json, true);
}
return new BaseItem(obj.id.NotNull(), json.NotNull(), false);
}
public override async Task<List<BaseItem>> SendToServer(
string streamId,
List<BaseItem> batch,
CancellationToken cancellationToken
)
{
if (batch.Count == 0)
{
return batch;
}
if (!_options.SkipServer)
{
await serverObjectManager.UploadObjects(streamId, batch, true, progress, cancellationToken).ConfigureAwait(false);
}
return batch;
}
public override void SaveToCache(List<BaseItem> items)
{
if (!_options.SkipCache)
{
sqliteSendCacheManager.SaveObjects(items);
Interlocked.Exchange(ref _cached, _cached + items.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, null));
}
}
}