Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 50906b172a | |||
| 05f7353925 | |||
| 8328498553 | |||
| 59019bf846 | |||
| 3afaf61a1a | |||
| 424609fad0 | |||
| 46c067308e | |||
| 0e33e8df8f | |||
| bc81c21e9d | |||
| 7f8b59d348 | |||
| 44ba61e4a5 |
@@ -13,8 +13,8 @@ public abstract class ChannelSaver<T>
|
||||
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
|
||||
private const int MAX_PARALLELISM_HTTP = 4;
|
||||
private const int HTTP_CAPACITY = 500;
|
||||
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
|
||||
private const int MAX_CACHE_BATCH = 500;
|
||||
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
|
||||
private const int MAX_CACHE_BATCH = 1000;
|
||||
|
||||
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
|
||||
new BoundedChannelOptions(SEND_CAPACITY)
|
||||
@@ -45,9 +45,9 @@ public abstract class ChannelSaver<T>
|
||||
cancellationToken
|
||||
)
|
||||
.Join()
|
||||
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH)
|
||||
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true)
|
||||
.WithTimeout(HTTP_BATCH_TIMEOUT)
|
||||
.ReadAllConcurrently(maxParallelism ?? MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
|
||||
.ContinueWith(
|
||||
t =>
|
||||
{
|
||||
|
||||
@@ -13,7 +13,26 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
|
||||
{
|
||||
private readonly CacheDbCommandPool _pool;
|
||||
|
||||
public SqLiteJsonCacheManager(string path, int concurrency)
|
||||
public static ISqLiteJsonCacheManager FromMemory(int concurrency) => new SqLiteJsonCacheManager(concurrency);
|
||||
|
||||
private SqLiteJsonCacheManager(int concurrency)
|
||||
{
|
||||
//disable pooling as we pool ourselves
|
||||
var builder = new SqliteConnectionStringBuilder
|
||||
{
|
||||
Pooling = false,
|
||||
DataSource = ":memory:",
|
||||
Cache = SqliteCacheMode.Shared,
|
||||
Mode = SqliteOpenMode.Memory,
|
||||
};
|
||||
_pool = new CacheDbCommandPool(builder.ToString(), concurrency);
|
||||
Initialize();
|
||||
}
|
||||
|
||||
public static ISqLiteJsonCacheManager FromFilePath(string path, int concurrency) =>
|
||||
new SqLiteJsonCacheManager(path, concurrency);
|
||||
|
||||
private SqLiteJsonCacheManager(string path, int concurrency)
|
||||
{
|
||||
//disable pooling as we pool ourselves
|
||||
var builder = new SqliteConnectionStringBuilder { Pooling = false, DataSource = path };
|
||||
@@ -47,12 +66,6 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
|
||||
command.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
// Insert Optimisations
|
||||
|
||||
//Note / Hack: This setting has the potential to corrupt the db.
|
||||
//cmd = new SqliteCommand("PRAGMA synchronous=OFF;", Connection);
|
||||
//cmd.ExecuteNonQuery();
|
||||
|
||||
using (SqliteCommand cmd1 = new("PRAGMA count_changes=OFF;", c))
|
||||
{
|
||||
cmd1.ExecuteNonQuery();
|
||||
|
||||
@@ -9,7 +9,8 @@ public class SqLiteJsonCacheManagerFactory : ISqLiteJsonCacheManagerFactory
|
||||
{
|
||||
public const int INITIAL_CONCURRENCY = 4;
|
||||
|
||||
private ISqLiteJsonCacheManager Create(string path, int concurrency) => new SqLiteJsonCacheManager(path, concurrency);
|
||||
private ISqLiteJsonCacheManager Create(string path, int concurrency) =>
|
||||
SqLiteJsonCacheManager.FromFilePath(path, concurrency);
|
||||
|
||||
public ISqLiteJsonCacheManager CreateForUser(string scope) =>
|
||||
Create(Path.Combine(SpecklePathProvider.UserApplicationDataPath(), "Speckle", $"{scope}.db"), 1);
|
||||
|
||||
@@ -16,8 +16,8 @@ public record SerializeProcessOptions(
|
||||
bool SkipFindTotalObjects = false
|
||||
)
|
||||
{
|
||||
public int? MaxHttpSendSize { get; set; }
|
||||
public int? MaxCacheSize { get; set; }
|
||||
public int? MaxHttpSendBatchSize { get; set; }
|
||||
public int? MaxCacheBatchSize { get; set; }
|
||||
public int? MaxParallelism { get; set; }
|
||||
}
|
||||
|
||||
@@ -112,8 +112,8 @@ public sealed class SerializeProcess(
|
||||
{
|
||||
var channelTask = objectSaver.Start(
|
||||
options?.MaxParallelism,
|
||||
options?.MaxHttpSendSize,
|
||||
options?.MaxCacheSize,
|
||||
options?.MaxHttpSendBatchSize,
|
||||
options?.MaxCacheBatchSize,
|
||||
_processSource.Token
|
||||
);
|
||||
var findTotalObjectsTask = Task.CompletedTask;
|
||||
|
||||
@@ -64,18 +64,10 @@ public class SerializeProcessFactory(
|
||||
#pragma warning disable CA2000
|
||||
var memoryJsonCacheManager = new MemoryJsonCacheManager(jsonCache);
|
||||
#pragma warning restore CA2000
|
||||
return new SerializeProcess(
|
||||
return CreateSerializeProcess(
|
||||
memoryJsonCacheManager,
|
||||
new MemoryServerObjectManager(objects),
|
||||
progress,
|
||||
new ObjectSaver(
|
||||
progress,
|
||||
memoryJsonCacheManager,
|
||||
new MemoryServerObjectManager(objects),
|
||||
loggerFactory.CreateLogger<ObjectSaver>(),
|
||||
cancellationToken
|
||||
),
|
||||
baseChildFinder,
|
||||
new BaseSerializer(memoryJsonCacheManager, objectSerializerFactory),
|
||||
loggerFactory,
|
||||
cancellationToken,
|
||||
options
|
||||
);
|
||||
|
||||
@@ -123,7 +123,7 @@ public class DetachedTests
|
||||
objects,
|
||||
null,
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendSize = 1 }
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendBatchSize = 1 }
|
||||
);
|
||||
var results = await serializeProcess.Serialize(@base);
|
||||
|
||||
@@ -150,7 +150,7 @@ public class DetachedTests
|
||||
objects,
|
||||
null,
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendSize = 1 }
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendBatchSize = 1 }
|
||||
);
|
||||
var results = await serializeProcess.Serialize(@base);
|
||||
|
||||
@@ -172,7 +172,7 @@ public class DetachedTests
|
||||
objects,
|
||||
null,
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendSize = 1 }
|
||||
new SerializeProcessOptions(false, false, true, true) { MaxParallelism = 1, MaxHttpSendBatchSize = 1 }
|
||||
);
|
||||
var results = await serializeProcess.Serialize(@base);
|
||||
|
||||
|
||||
@@ -95,8 +95,8 @@ public class ExceptionTests
|
||||
default,
|
||||
new SerializeProcessOptions(false, false, false, true)
|
||||
{
|
||||
MaxHttpSendSize = 1,
|
||||
MaxCacheSize = 1,
|
||||
MaxHttpSendBatchSize = 1,
|
||||
MaxCacheBatchSize = 1,
|
||||
MaxParallelism = 1,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -14,6 +14,7 @@ using Speckle.Sdk.Serialisation.V2;
|
||||
using Speckle.Sdk.Serialisation.V2.Receive;
|
||||
using Speckle.Sdk.Serialisation.V2.Send;
|
||||
using Speckle.Sdk.Serialization.Tests.Framework;
|
||||
using Speckle.Sdk.SQLite;
|
||||
using Speckle.Sdk.Testing.Framework;
|
||||
|
||||
namespace Speckle.Sdk.Serialization.Tests;
|
||||
@@ -50,45 +51,45 @@ public class SerializationTests
|
||||
public void Dispose() { }
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("RevitObject.json.gz")]
|
||||
public async Task Basic_Namespace_Validation(string fileName)
|
||||
{
|
||||
var closures = TestFileManager.GetFileAsClosures(fileName);
|
||||
var deserializer = new SpeckleObjectDeserializer
|
||||
/* [Theory]
|
||||
[InlineData("RevitObject.json.gz")]
|
||||
public async Task Basic_Namespace_Validation(string fileName)
|
||||
{
|
||||
ReadTransport = new TestTransport(closures),
|
||||
CancellationToken = default,
|
||||
};
|
||||
|
||||
foreach (var (id, objJson) in closures)
|
||||
{
|
||||
var jObject = JObject.Parse(objJson);
|
||||
var oldSpeckleType = jObject["speckle_type"].NotNull().Value<string>().NotNull();
|
||||
var starts = oldSpeckleType.StartsWith("Speckle.Core.") || oldSpeckleType.StartsWith("Objects.");
|
||||
starts.Should().BeTrue($"{oldSpeckleType} isn't expected");
|
||||
|
||||
var baseType = await deserializer.DeserializeAsync(objJson);
|
||||
baseType.id.Should().Be(id);
|
||||
|
||||
var oldType = TypeLoader.GetAtomicType(oldSpeckleType);
|
||||
if (oldType == typeof(Base))
|
||||
var closures = TestFileManager.GetFileAsClosures(fileName);
|
||||
var deserializer = new SpeckleObjectDeserializer
|
||||
{
|
||||
oldSpeckleType.Should().NotContain("Base");
|
||||
}
|
||||
else
|
||||
ReadTransport = new TestTransport(closures),
|
||||
CancellationToken = default,
|
||||
};
|
||||
|
||||
foreach (var (id, objJson) in closures)
|
||||
{
|
||||
starts = baseType.speckle_type.StartsWith("Speckle.Core.") || baseType.speckle_type.StartsWith("Objects.");
|
||||
starts.Should().BeTrue($"{baseType.speckle_type} isn't expected");
|
||||
|
||||
var type = TypeLoader.GetAtomicType(baseType.speckle_type);
|
||||
type.Should().NotBeNull();
|
||||
var name = TypeLoader.GetTypeString(type) ?? throw new ArgumentNullException($"Could not find: {type}");
|
||||
starts = name.StartsWith("Speckle.Core") || name.StartsWith("Objects");
|
||||
starts.Should().BeTrue($"{name} isn't expected");
|
||||
var jObject = JObject.Parse(objJson);
|
||||
var oldSpeckleType = jObject["speckle_type"].NotNull().Value<string>().NotNull();
|
||||
var starts = oldSpeckleType.StartsWith("Speckle.Core.") || oldSpeckleType.StartsWith("Objects.");
|
||||
starts.Should().BeTrue($"{oldSpeckleType} isn't expected");
|
||||
|
||||
var baseType = await deserializer.DeserializeAsync(objJson);
|
||||
baseType.id.Should().Be(id);
|
||||
|
||||
var oldType = TypeLoader.GetAtomicType(oldSpeckleType);
|
||||
if (oldType == typeof(Base))
|
||||
{
|
||||
oldSpeckleType.Should().NotContain("Base");
|
||||
}
|
||||
else
|
||||
{
|
||||
starts = baseType.speckle_type.StartsWith("Speckle.Core.") || baseType.speckle_type.StartsWith("Objects.");
|
||||
starts.Should().BeTrue($"{baseType.speckle_type} isn't expected");
|
||||
|
||||
var type = TypeLoader.GetAtomicType(baseType.speckle_type);
|
||||
type.Should().NotBeNull();
|
||||
var name = TypeLoader.GetTypeString(type) ?? throw new ArgumentNullException($"Could not find: {type}");
|
||||
starts = name.StartsWith("Speckle.Core") || name.StartsWith("Objects");
|
||||
starts.Should().BeTrue($"{name} isn't expected");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
[Theory]
|
||||
[InlineData("RevitObject.json.gz")]
|
||||
@@ -184,9 +185,16 @@ public class SerializationTests
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("RevitObject.json.gz", "3416d3fe01c9196115514c4a2f41617b", 7818, 4674)]
|
||||
public async Task Roundtrip_Test_New(string fileName, string rootId, int oldCount, int newCount)
|
||||
[InlineData(1)]
|
||||
[InlineData(2)]
|
||||
[InlineData(3)]
|
||||
[InlineData(4)]
|
||||
public async Task Roundtrip_Test_New(int concurrency)
|
||||
{
|
||||
string fileName = "RevitObject.json.gz";
|
||||
string rootId = "3416d3fe01c9196115514c4a2f41617b";
|
||||
int oldCount = 7818;
|
||||
int newCount = 4674;
|
||||
var closures = TestFileManager.GetFileAsClosures(fileName);
|
||||
closures.Count.Should().Be(oldCount);
|
||||
|
||||
@@ -218,11 +226,11 @@ public class SerializationTests
|
||||
|
||||
await using (
|
||||
var serializeProcess = _factory.CreateSerializeProcess(
|
||||
new ConcurrentDictionary<Id, Json>(),
|
||||
newIdToJson,
|
||||
SqLiteJsonCacheManager.FromMemory(1),
|
||||
new MemoryServerObjectManager(newIdToJson),
|
||||
null,
|
||||
default,
|
||||
new SerializeProcessOptions(true, true, false, true)
|
||||
new SerializeProcessOptions(false, false, false, true) { MaxCacheBatchSize = 1, MaxParallelism = concurrency }
|
||||
)
|
||||
)
|
||||
{
|
||||
|
||||
@@ -22,7 +22,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
public void TestGetAll()
|
||||
{
|
||||
var data = new List<(string id, string json)>() { ("id1", "1"), ("id2", "2") };
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.SaveObjects(data);
|
||||
var items = manager.GetAllObjects();
|
||||
items.Count.Should().Be(data.Count);
|
||||
@@ -38,7 +38,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
public void TestGet()
|
||||
{
|
||||
var data = new List<(string id, string json)>() { ("id1", "1"), ("id2", "2") };
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
foreach (var d in data)
|
||||
{
|
||||
manager.SaveObject(d.id, d.json);
|
||||
@@ -84,7 +84,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
public void TestLargeJsonPayload()
|
||||
{
|
||||
var largeJson = new string('a', 100_000);
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.SaveObject("large", largeJson);
|
||||
var result = manager.GetObject("large");
|
||||
result.Should().Be(largeJson);
|
||||
@@ -96,7 +96,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
var id = "spécial_字符_!@#$%^&*()";
|
||||
var json = /*lang=json,strict*/
|
||||
"{\"value\": \"特殊字符!@#$%^&*()\"}";
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.SaveObject(id, json);
|
||||
var result = manager.GetObject(id);
|
||||
result.Should().Be(json);
|
||||
@@ -108,7 +108,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
[Fact]
|
||||
public void TestBulkInsertEmptyCollection()
|
||||
{
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.SaveObjects(new List<(string, string)>());
|
||||
manager.GetAllObjects().Count.Should().Be(0);
|
||||
}
|
||||
@@ -116,7 +116,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
[Fact]
|
||||
public void TestRepeatedUpdateAndDelete()
|
||||
{
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.SaveObject("id", "1");
|
||||
manager.UpdateObject("id", "2");
|
||||
manager.UpdateObject("id", "3");
|
||||
@@ -129,7 +129,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
[Fact]
|
||||
public void TestGetAndDeleteNonExistentId()
|
||||
{
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
manager.GetObject("doesnotexist").Should().BeNull();
|
||||
manager.HasObject("doesnotexist").Should().BeFalse();
|
||||
manager.DeleteObject("doesnotexist"); // Should not throw
|
||||
@@ -138,7 +138,7 @@ public class SQLiteJsonCacheManagerTests : IDisposable
|
||||
[Fact]
|
||||
public void TestNullOrEmptyInput()
|
||||
{
|
||||
using var manager = new SqLiteJsonCacheManager(_basePath, 2);
|
||||
using var manager = SqLiteJsonCacheManager.FromFilePath(_basePath, 2);
|
||||
// Empty id
|
||||
Assert.Throws<ArgumentException>(() => manager.SaveObject("", "emptyid"));
|
||||
// Empty json
|
||||
|
||||
Reference in New Issue
Block a user