Sqlite pooling for connections and commands (#193)

* add ServerObjectManagerFactory

* add usage of a command pool

* add more disposal

* save saving increase

* fix tests

* fixes

* push out concurrency and disposablity

* Add a custom task scheduler

* Better usage, don't wait to enqueue to save to channels

* Completely pre-cal batch size to avoid spinning issues

* Try to fix cache counting

* properly dispose things

* format

* clean up

* adjust count and save on current thread

* move batch it's own file

* update a few packages

* fix build and add batch tests

* revert and format

* Revert "save saving increase"

This reverts commit 3b50c857fb.

* revert change

* adjust and add tests

* Dispose sqlite manager properly

* Make Batch a IMemoryOwner to allow for pooling

* Fix tests

* Upgrade some deps

* try to make tests more explicit

* remove return value

* Use named tuple for all objects
This commit is contained in:
Adam Hathcock
2025-01-08 11:04:32 +00:00
committed by GitHub
parent 11fe8e8cce
commit ed5bdc91ed
28 changed files with 486 additions and 203 deletions
+3 -3
View File
@@ -9,7 +9,7 @@
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<!-- Keep at exactly 7.0.5 for side by side with V2 -->
<PackageVersion Include="Microsoft.Data.Sqlite" Version="7.0.5" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="9.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
@@ -19,7 +19,7 @@
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageVersion Include="NUnit" Version="4.2.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.2.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="8.6.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="9.0.0" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
@@ -27,7 +27,7 @@
<PackageVersion Include="Speckle.Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="Speckle.DoubleNumerics" Version="4.0.1" />
<PackageVersion Include="SimpleExec" Version="12.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="9.0.0" />
<GlobalPackageReference Include="PolySharp" Version="1.15.0" />
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="GitVersion.MsBuild" Version="5.12.0" />
+6 -4
View File
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Text;
using Microsoft.Extensions.ObjectPool;
@@ -6,6 +6,8 @@ namespace Speckle.Sdk.Dependencies;
public static class Pools
{
public const int DefaultCapacity = 50;
public static Pool<Dictionary<string, object?>> ObjectDictionaries { get; } = new(new ObjectDictionaryPolicy());
private sealed class ObjectDictionaryPolicy : IPooledObjectPolicy<Dictionary<string, object?>>
@@ -25,7 +27,7 @@ public static class Pools
private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);
public Dictionary<TKey, TValue> Create() => new(DefaultCapacity);
public bool Return(Dictionary<TKey, TValue> obj)
{
@@ -38,7 +40,7 @@ public static class Pools
: IPooledObjectPolicy<ConcurrentDictionary<TKey, TValue>>
where TKey : notnull
{
public ConcurrentDictionary<TKey, TValue> Create() => new(Environment.ProcessorCount, 50);
public ConcurrentDictionary<TKey, TValue> Create() => new(Environment.ProcessorCount, DefaultCapacity);
public bool Return(ConcurrentDictionary<TKey, TValue> obj)
{
@@ -49,7 +51,7 @@ public static class Pools
private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);
public List<T> Create() => new(DefaultCapacity);
public bool Return(List<T> obj)
{
@@ -1,10 +1,14 @@
namespace Speckle.Sdk.Serialisation.V2.Send;
using System.Buffers;
using Speckle.Sdk.Dependencies;
public class Batch<T>(int capacity) : IHasSize
namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed class Batch<T> : IHasSize, IMemoryOwner<T>
where T : IHasSize
{
private static readonly Pool<List<T>> _pool = Pools.CreateListPool<T>();
#pragma warning disable IDE0032
private readonly List<T> _items = new(capacity);
private readonly List<T> _items = _pool.Get();
private int _batchSize;
#pragma warning restore IDE0032
@@ -22,4 +26,8 @@ public class Batch<T>(int capacity) : IHasSize
public int Size => _batchSize;
public List<T> Items => _items;
public void Dispose() => _pool.Return(_items);
public Memory<T> Memory => new(_items.ToArray());
}
@@ -1,11 +1,12 @@
using System.Threading.Channels;
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
namespace Speckle.Sdk.Serialisation.V2.Send;
public static class ChannelExtensions
{
public static BatchingChannelReader<T, Batch<T>> BatchBySize<T>(
public static BatchingChannelReader<T, IMemoryOwner<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
@@ -1,3 +1,4 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;
@@ -13,7 +14,7 @@ public abstract class ChannelSaver<T>
private const int MAX_PARALLELISM_HTTP = 4;
private const int HTTP_CAPACITY = 500;
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
private const int MAX_CACHE_BATCH = 200;
private const int MAX_CACHE_BATCH = 500;
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
@@ -46,7 +47,13 @@ public abstract class ChannelSaver<T>
public ValueTask Save(T item, CancellationToken cancellationToken = default) =>
_checkCacheChannel.Writer.WriteAsync(item, cancellationToken);
public abstract Task<List<T>> SendToServer(Batch<T> batch, CancellationToken cancellationToken);
public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch, CancellationToken cancellationToken)
{
await SendToServer((Batch<T>)batch, cancellationToken).ConfigureAwait(false);
return batch;
}
public abstract Task SendToServer(Batch<T> batch, CancellationToken cancellationToken);
public Task Done()
{
@@ -1,3 +1,4 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
@@ -13,20 +14,20 @@ public class SizeBatchingChannelReader<T>(
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<T, Batch<T>>(x => new(x), source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, IMemoryOwner<T>>(x => new Batch<T>(), source, batchSize, singleReader, syncCont)
where T : IHasSize
{
protected override Batch<T> CreateBatch(int capacity) => new(capacity);
protected override IMemoryOwner<T> CreateBatch(int capacity) => new Batch<T>();
protected override void TrimBatch(ref Batch<T> batch, bool isVerifiedFull)
protected override void TrimBatch(ref IMemoryOwner<T> batch, bool isVerifiedFull)
{
if (!isVerifiedFull)
{
batch.TrimExcess();
((Batch<T>)batch).TrimExcess();
}
}
protected override void AddBatchItem(Batch<T> batch, T item) => batch.Add(item);
protected override void AddBatchItem(IMemoryOwner<T> batch, T item) => ((Batch<T>)batch).Add(item);
protected override int GetBatchSize(Batch<T> batch) => batch.Size;
protected override int GetBatchSize(IMemoryOwner<T> batch) => ((Batch<T>)batch).Size;
}
+26 -25
View File
@@ -19,9 +19,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "UbsU/gYe4nv1DeqMXIVzDfNNek7Sk2kKuAOXL/Y+sLcAR0HwFUqzg1EPiU88jeHNe0g81aPvvHbvHarQr3r9IA=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
@@ -44,13 +44,13 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA==",
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "DP+l5S6G46wcuY4I4kNXE+RDOmJr0DKuMienOdt0mMBN9z7vmLSC8YQbqCyb9i9LNjXj1tgCx5LyitJiRr/v7g==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "8.0.0",
"System.Collections.Immutable": "8.0.0",
"System.Threading.Channels": "8.0.0"
"Microsoft.Bcl.AsyncInterfaces": "9.0.0",
"System.Collections.Immutable": "9.0.0",
"System.Threading.Channels": "9.0.0"
}
},
"Polly": {
@@ -88,10 +88,11 @@
},
"System.Threading.Channels": {
"type": "Direct",
"requested": "[8.0.0, )",
"resolved": "8.0.0",
"contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA==",
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "hzACdIf1C+4Dqos5ijV404b94+LqfIC8nfS3mNpCDFWowb1N3PNfJPopneq32ahWlDeyaPZJqjBk76YFR69Rpg==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "9.0.0",
"System.Threading.Tasks.Extensions": "4.5.4"
}
},
@@ -122,8 +123,8 @@
},
"System.Collections.Immutable": {
"type": "Transitive",
"resolved": "8.0.0",
"contentHash": "AurL6Y5BA1WotzlEvVaIDpqzpIPvYnnldxru8oXJU2yFxFUy3+pNXjXd1ymO+RA0rq0+590Q8gaz2l3Sr7fmqg==",
"resolved": "9.0.0",
"contentHash": "QhkXUl2gNrQtvPmtBTQHb0YsUrDiDQ2QS09YbtTTiSjGcf7NBqtYbrG/BE06zcBPCKEwQGzIv13IVdXNOSub2w==",
"dependencies": {
"System.Memory": "4.5.5",
"System.Runtime.CompilerServices.Unsafe": "6.0.0"
@@ -160,8 +161,8 @@
"Microsoft.Bcl.AsyncInterfaces": {
"type": "CentralTransitive",
"requested": "[5.0.0, )",
"resolved": "8.0.0",
"contentHash": "3WA9q9yVqJp222P3x1wYIGDAkpjAku0TMUaaQV22g6L67AI0LdOIrVS7Ht2vJfLHGSPVuqN94vIr15qn+HEkHw==",
"resolved": "9.0.0",
"contentHash": "owmu2Cr3IQ8yQiBleBHlGk8dSQ12oaF2e7TpzwJKEl4m84kkZJjEY1n33L67Y3zM5jPOjmmbdHjbfiL0RqcMRQ==",
"dependencies": {
"System.Threading.Tasks.Extensions": "4.5.4"
}
@@ -185,9 +186,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "UbsU/gYe4nv1DeqMXIVzDfNNek7Sk2kKuAOXL/Y+sLcAR0HwFUqzg1EPiU88jeHNe0g81aPvvHbvHarQr3r9IA=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
@@ -201,9 +202,9 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "DP+l5S6G46wcuY4I4kNXE+RDOmJr0DKuMienOdt0mMBN9z7vmLSC8YQbqCyb9i9LNjXj1tgCx5LyitJiRr/v7g=="
},
"Polly": {
"type": "Direct",
@@ -240,9 +241,9 @@
},
"System.Threading.Channels": {
"type": "Direct",
"requested": "[8.0.0, )",
"resolved": "8.0.0",
"contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA=="
"requested": "[9.0.0, )",
"resolved": "9.0.0",
"contentHash": "hzACdIf1C+4Dqos5ijV404b94+LqfIC8nfS3mNpCDFWowb1N3PNfJPopneq32ahWlDeyaPZJqjBk76YFR69Rpg=="
},
"ILRepack": {
"type": "Transitive",
+13 -4
View File
@@ -21,11 +21,13 @@ using Stream = System.IO.Stream;
namespace Speckle.Sdk.Credentials;
public partial interface IAccountManager : IDisposable;
/// <summary>
/// Manage accounts locally for desktop applications.
/// </summary>
[GenerateAutoInterface]
public class AccountManager(
public sealed class AccountManager(
ISpeckleApplication application,
ILogger<AccountManager> logger,
ISpeckleHttp speckleHttp,
@@ -40,6 +42,13 @@ public class AccountManager(
"AccountAddFlow"
);
[AutoInterfaceIgnore]
public void Dispose()
{
_accountStorage.Dispose();
_accountAddLockStorage.Dispose();
}
/// <summary>
/// Gets the basic information about a server.
/// </summary>
@@ -321,7 +330,7 @@ public class AccountManager(
{
static bool IsInvalid(Account ac) => ac.userInfo == null || ac.serverInfo == null;
var sqlAccounts = _accountStorage.GetAllObjects().Select(x => JsonConvert.DeserializeObject<Account>(x));
var sqlAccounts = _accountStorage.GetAllObjects().Select(x => JsonConvert.DeserializeObject<Account>(x.Json));
var localAccounts = GetLocalAccounts();
foreach (var acc in sqlAccounts)
@@ -642,7 +651,7 @@ public class AccountManager(
}
// this uses the SQLite transport to store locks
var lockIds = _accountAddLockStorage.GetAllObjects().OrderByDescending(d => d).ToList();
var lockIds = _accountAddLockStorage.GetAllObjects().Select(x => x.Id).OrderByDescending(d => d).ToList();
var now = DateTime.Now;
foreach (var l in lockIds)
{
@@ -674,7 +683,7 @@ public class AccountManager(
{
s_isAddingAccount = false;
// make sure all old locks are removed
foreach (var id in _accountAddLockStorage.GetAllObjects())
foreach (var (id, _) in _accountAddLockStorage.GetAllObjects())
{
_accountAddLockStorage.DeleteObject(id);
}
@@ -0,0 +1,96 @@
using System.Collections.Concurrent;
using Microsoft.Data.Sqlite;
namespace Speckle.Sdk.SQLite;
//inspired by https://github.com/neosmart/SqliteCache/blob/master/SqliteCache/DbCommandPool.cs
public sealed class CacheDbCommandPool : IDisposable
{
private readonly ConcurrentBag<SqliteCommand>[] _commands = new ConcurrentBag<SqliteCommand>[CacheDbCommands.Count];
private readonly ConcurrentBag<SqliteConnection> _connections = new();
private readonly string _connectionString;
public CacheDbCommandPool(string connectionString, int concurrency)
{
_connectionString = connectionString;
for (int i = 0; i < _commands.Length; ++i)
{
_commands[i] = new ConcurrentBag<SqliteCommand>();
}
for (int i = 0; i < concurrency; ++i)
{
var connection = new SqliteConnection(_connectionString);
connection.Open();
_connections.Add(connection);
}
}
public void Use(CacheOperation type, Action<SqliteCommand> handler) =>
Use(
type,
cmd =>
{
handler(cmd);
return true;
}
);
private T Use<T>(Func<SqliteConnection, T> handler)
{
if (!_connections.TryTake(out var db))
{
db = new SqliteConnection(_connectionString);
db.Open();
}
try
{
return handler(db);
}
finally
{
_connections.Add(db);
}
}
public T Use<T>(CacheOperation type, Func<SqliteCommand, T> handler) =>
Use(conn =>
{
var pool = _commands[(int)type];
if (!pool.TryTake(out var command))
{
#pragma warning disable CA2100
command = new SqliteCommand(CacheDbCommands.Commands[(int)type], conn);
#pragma warning restore CA2100
}
try
{
command.Connection = conn;
return handler(command);
}
finally
{
command.Connection = null;
command.Parameters.Clear();
pool.Add(command);
}
});
public void Dispose()
{
foreach (var pool in _commands)
{
while (pool.TryTake(out var cmd))
{
cmd.Dispose();
}
}
foreach (var conn in _connections)
{
conn.Close();
conn.Dispose();
}
}
}
+35
View File
@@ -0,0 +1,35 @@
namespace Speckle.Sdk.SQLite;
public enum CacheOperation
{
InsertOrIgnore,
InsertOrReplace,
Has,
Get,
Delete,
GetAll,
BulkInsertOrIgnore,
}
public static class CacheDbCommands
{
public static readonly string[] Commands;
public static readonly int Count = Enum.GetValues(typeof(CacheOperation)).Length;
#pragma warning disable CA1810
static CacheDbCommands()
#pragma warning restore CA1810
{
Commands = new string[Count];
Commands[(int)CacheOperation.InsertOrIgnore] =
"INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
Commands[(int)CacheOperation.InsertOrReplace] = "REPLACE INTO objects(hash, content) VALUES(@hash, @content)";
Commands[(int)CacheOperation.Has] = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Get] = "SELECT content FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Delete] = "DELETE FROM objects WHERE hash = @hash";
Commands[(int)CacheOperation.GetAll] = "SELECT hash, content FROM objects";
Commands[(int)CacheOperation.BulkInsertOrIgnore] = "INSERT OR IGNORE INTO objects (hash, content) VALUES ";
}
}
+100 -84
View File
@@ -1,19 +1,28 @@
using System.Text;
using Microsoft.Data.Sqlite;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Dependencies;
namespace Speckle.Sdk.SQLite;
public partial interface ISqLiteJsonCacheManager : IDisposable;
[GenerateAutoInterface]
public class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
{
private readonly string _connectionString;
private readonly CacheDbCommandPool _pool;
public SqLiteJsonCacheManager(string rootPath)
public SqLiteJsonCacheManager(string connectionString, int concurrency)
{
_connectionString = $"Data Source={rootPath};";
_connectionString = connectionString;
Initialize();
_pool = new CacheDbCommandPool(_connectionString, concurrency);
}
[AutoInterfaceIgnore]
public void Dispose() => _pool.Dispose();
private void Initialize()
{
// NOTE: used for creating partioned object tables.
@@ -57,100 +66,107 @@ public class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
using SqliteCommand cmd4 = new("PRAGMA page_size = 32768;", c);
cmd4.ExecuteNonQuery();
c.Close();
}
public IEnumerable<string> GetAllObjects()
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("SELECT * FROM objects", c);
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() =>
_pool.Use(
CacheOperation.GetAll,
command =>
{
var list = new HashSet<(string, string)>();
using var reader = command.ExecuteReader();
while (reader.Read())
{
list.Add((reader.GetString(0), reader.GetString(1)));
}
return list;
}
);
using var reader = command.ExecuteReader();
while (reader.Read())
{
yield return reader.GetString(1);
}
}
public void DeleteObject(string id) =>
_pool.Use(
CacheOperation.Delete,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.ExecuteNonQuery();
}
);
public void DeleteObject(string id)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("DELETE FROM objects WHERE hash = @hash", c);
command.Parameters.AddWithValue("@hash", id);
command.ExecuteNonQuery();
}
public string? GetObject(string id)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("SELECT * FROM objects WHERE hash = @hash LIMIT 1 ", c);
command.Parameters.AddWithValue("@hash", id);
using var reader = command.ExecuteReader();
if (reader.Read())
{
return reader.GetString(1);
}
return null; // pass on the duty of null checks to consumers
}
public string? GetObject(string id) =>
_pool.Use(
CacheOperation.Get,
command =>
{
command.Parameters.AddWithValue("@hash", id);
return (string?)command.ExecuteScalar();
}
);
//This does an insert or ignores if already exists
public void SaveObject(string id, string json)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
public void SaveObject(string id, string json) =>
_pool.Use(
CacheOperation.InsertOrIgnore,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
);
//This does an insert or replaces if already exists
public void UpdateObject(string id, string json)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "REPLACE INTO objects(hash, content) VALUES(@hash, @content)";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
public void UpdateObject(string id, string json) =>
_pool.Use(
CacheOperation.InsertOrReplace,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
);
public void SaveObjects(IEnumerable<(string id, string json)> items)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var t = c.BeginTransaction();
const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
public void SaveObjects(IEnumerable<(string id, string json)> items) =>
_pool.Use(
CacheOperation.BulkInsertOrIgnore,
cmd =>
{
CreateBulkInsert(cmd, items);
return cmd.ExecuteNonQuery();
}
);
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Transaction = t;
var idParam = command.Parameters.Add("@hash", SqliteType.Text);
var jsonParam = command.Parameters.Add("@content", SqliteType.Text);
private void CreateBulkInsert(SqliteCommand cmd, IEnumerable<(string id, string json)> items)
{
StringBuilder sb = Pools.StringBuilders.Get();
sb.AppendLine(CacheDbCommands.Commands[(int)CacheOperation.BulkInsertOrIgnore]);
int i = 0;
foreach (var (id, json) in items)
{
idParam.Value = id;
jsonParam.Value = json;
command.ExecuteNonQuery();
sb.Append($"(@key{i}, @value{i}),");
cmd.Parameters.AddWithValue($"@key{i}", id);
cmd.Parameters.AddWithValue($"@value{i}", json);
i++;
}
t.Commit();
sb.Remove(sb.Length - 1, 1);
sb.Append(';');
#pragma warning disable CA2100
cmd.CommandText = sb.ToString();
#pragma warning restore CA2100
Pools.StringBuilders.Return(sb);
}
public bool HasObject(string objectId)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1 ";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", objectId);
using var reader = command.ExecuteReader();
bool rowFound = reader.Read();
return rowFound;
}
public bool HasObject(string objectId) =>
_pool.Use(
CacheOperation.Has,
command =>
{
command.Parameters.AddWithValue("@hash", objectId);
using var reader = command.ExecuteReader();
bool rowFound = reader.Read();
return rowFound;
}
);
}
@@ -7,10 +7,14 @@ namespace Speckle.Sdk.SQLite;
[GenerateAutoInterface]
public class SqLiteJsonCacheManagerFactory : ISqLiteJsonCacheManagerFactory
{
private ISqLiteJsonCacheManager Create(string path) => new SqLiteJsonCacheManager(path);
public const int INITIAL_CONCURRENCY = 4;
private ISqLiteJsonCacheManager Create(string path, int concurrency) =>
new SqLiteJsonCacheManager($"Data Source={path};", concurrency);
public ISqLiteJsonCacheManager CreateForUser(string scope) =>
Create(Path.Combine(SpecklePathProvider.UserApplicationDataPath(), "Speckle", $"{scope}.db"));
Create(Path.Combine(SpecklePathProvider.UserApplicationDataPath(), "Speckle", $"{scope}.db"), 1);
public ISqLiteJsonCacheManager CreateFromStream(string streamId) => Create(SqlitePaths.GetDBPath(streamId));
public ISqLiteJsonCacheManager CreateFromStream(string streamId) =>
Create(SqlitePaths.GetDBPath(streamId), INITIAL_CONCURRENCY);
}
@@ -5,9 +5,11 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2;
public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
public sealed class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
{
public IEnumerable<string> GetAllObjects() => throw new NotImplementedException();
public void Dispose() { }
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
@@ -13,6 +13,8 @@ public record DeserializeProcessOptions(
bool SkipInvalidConverts = false
);
public partial interface IDeserializeProcess : IDisposable;
[GenerateAutoInterface]
public sealed class DeserializeProcess(
IProgress<ProgressArgs>? progress,
@@ -30,6 +32,9 @@ public sealed class DeserializeProcess(
public IReadOnlyDictionary<string, Base> BaseCache => _baseCache;
public long Total { get; private set; }
[AutoInterfaceIgnore]
public void Dispose() => objectLoader.Dispose();
public async Task<Base> Deserialize(string rootId, CancellationToken cancellationToken)
{
var (rootJson, childrenIds) = await objectLoader
@@ -9,6 +9,8 @@ using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2.Receive;
public partial interface IObjectLoader : IDisposable;
[GenerateAutoInterface]
public sealed class ObjectLoader(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
@@ -21,6 +23,9 @@ public sealed class ObjectLoader(
private long _cached;
private DeserializeProcessOptions _options = new(false);
[AutoInterfaceIgnore]
public void Dispose() => sqLiteJsonCacheManager.Dispose();
public async Task<(string, IReadOnlyCollection<string>)> GetAndCache(
string rootId,
DeserializeProcessOptions options,
@@ -0,0 +1,19 @@
using System.Text;
namespace Speckle.Sdk.Serialisation.V2.Send;
public readonly record struct BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures) : IHasSize
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id.Value, other.Value.Id.Value, StringComparison.OrdinalIgnoreCase);
}
public override int GetHashCode() => Id.GetHashCode();
}
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
namespace Speckle.Sdk.Serialisation.V2.Send;
@@ -1,5 +1,4 @@
using System.Collections.Concurrent;
using System.Text;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
@@ -23,22 +22,6 @@ public readonly record struct SerializeProcessResults(
IReadOnlyDictionary<Id, ObjectReference> ConvertedReferences
);
public readonly record struct BaseItem(Id Id, Json Json, bool NeedsStorage, Closures? Closures) : IHasSize
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id.Value, other.Value.Id.Value, StringComparison.OrdinalIgnoreCase);
}
public override int GetHashCode() => Id.GetHashCode();
}
public partial interface ISerializeProcess : IDisposable;
[GenerateAutoInterface]
@@ -77,6 +60,7 @@ public sealed class SerializeProcess(
{
_highest.Dispose();
_belowNormal.Dispose();
sqLiteJsonCacheManager.Dispose();
}
public async Task<SerializeProcessResults> Serialize(Base root, CancellationToken cancellationToken)
@@ -223,7 +207,7 @@ public sealed class SerializeProcess(
return new BaseItem(id, json, true, closures);
}
public override async Task<List<BaseItem>> SendToServer(Batch<BaseItem> batch, CancellationToken cancellationToken)
public override async Task SendToServer(Batch<BaseItem> batch, CancellationToken cancellationToken)
{
if (!_options.SkipServer && batch.Items.Count != 0)
{
@@ -238,9 +222,7 @@ public sealed class SerializeProcess(
Interlocked.Exchange(ref _uploaded, _uploaded + batch.Items.Count);
}
progress?.Report(new(ProgressEvent.UploadedObjects, _uploaded, null));
return objectBatch;
}
return batch.Items;
}
public override void SaveToCache(List<BaseItem> batch)
@@ -1,5 +1,3 @@
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
@@ -23,20 +21,14 @@ public interface ISerializeProcessFactory
IProgress<ProgressArgs>? progress,
DeserializeProcessOptions? options = null
);
public ISerializeProcess CreateSerializeProcess(
SerializeProcessOptions? options = null,
IProgress<ProgressArgs>? progress = null
);
}
public class SerializeProcessFactory(
ISpeckleHttp speckleHttp,
ISdkActivityFactory activityFactory,
IBaseChildFinder baseChildFinder,
IObjectSerializerFactory objectSerializerFactory,
IObjectDeserializerFactory objectDeserializerFactory,
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory,
IServerObjectManagerFactory serverObjectManagerFactory
) : ISerializeProcessFactory
{
public ISerializeProcess CreateSerializeProcess(
@@ -48,24 +40,7 @@ public class SerializeProcessFactory(
)
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = new ServerObjectManager(speckleHttp, activityFactory, url, streamId, authorizationToken);
return new SerializeProcess(
progress,
sqLiteJsonCacheManager,
serverObjectManager,
baseChildFinder,
objectSerializerFactory,
options
);
}
public ISerializeProcess CreateSerializeProcess(
SerializeProcessOptions? options = null,
IProgress<ProgressArgs>? progress = null
)
{
var sqLiteJsonCacheManager = new DummySqLiteJsonCacheManager();
var serverObjectManager = new DummySendServerObjectManager();
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);
return new SerializeProcess(
progress,
sqLiteJsonCacheManager,
@@ -85,9 +60,12 @@ public class SerializeProcessFactory(
)
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = new ServerObjectManager(speckleHttp, activityFactory, url, streamId, authorizationToken);
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);
#pragma warning disable CA2000
//owned by process, refactor later
var objectLoader = new ObjectLoader(sqLiteJsonCacheManager, serverObjectManager, progress);
#pragma warning restore CA2000
return new DeserializeProcess(progress, objectLoader, objectDeserializerFactory, options);
}
}
@@ -0,0 +1,13 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
namespace Speckle.Sdk.Serialisation.V2;
[GenerateAutoInterface]
public class ServerObjectManagerFactory(ISpeckleHttp speckleHttp, ISdkActivityFactory activityFactory)
: IServerObjectManagerFactory
{
public IServerObjectManager Create(Uri url, string streamId, string? authorizationToken, int timeoutSeconds = 120) =>
new ServerObjectManager(speckleHttp, activityFactory, url, streamId, authorizationToken, timeoutSeconds);
}
@@ -3,9 +3,7 @@ using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Speckle.Sdk;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Host;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Receive;
@@ -43,12 +41,11 @@ var token = serviceProvider.GetRequiredService<IAccountManager>().GetDefaultAcco
var progress = new Progress(true);
var factory = new SerializeProcessFactory(
serviceProvider.GetRequiredService<ISpeckleHttp>(),
serviceProvider.GetRequiredService<ISdkActivityFactory>(),
new BaseChildFinder(new BasePropertyGatherer()),
new ObjectSerializerFactory(new BasePropertyGatherer()),
new ObjectDeserializerFactory(),
serviceProvider.GetRequiredService<ISqLiteJsonCacheManagerFactory>()
serviceProvider.GetRequiredService<ISqLiteJsonCacheManagerFactory>(),
serviceProvider.GetRequiredService<IServerObjectManagerFactory>()
);
var process = factory.CreateDeserializeProcess(new Uri(url), streamId, token, progress, new(skipCacheReceive));
var @base = await process.Deserialize(rootId, default).ConfigureAwait(false);
@@ -529,7 +529,9 @@ public class DummyServerObjectManager : IServerObjectManager
public class DummySendCacheManager(Dictionary<string, string> objects) : ISqLiteJsonCacheManager
{
public IEnumerable<string> GetAllObjects() => throw new NotImplementedException();
public void Dispose() { }
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
@@ -4,7 +4,9 @@ namespace Speckle.Sdk.Serialization.Tests;
public class DummySqLiteReceiveManager(Dictionary<string, string> savedObjects) : ISqLiteJsonCacheManager
{
public IEnumerable<string> GetAllObjects() => throw new NotImplementedException();
public void Dispose() { }
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
@@ -14,7 +14,9 @@ public class DummySqLiteSendManager : ISqLiteJsonCacheManager
public bool HasObject(string objectId) => throw new NotImplementedException();
public IEnumerable<string> GetAllObjects() => throw new NotImplementedException();
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
public void DeleteObject(string id) => throw new NotImplementedException();
public void Dispose() { }
}
@@ -33,6 +33,8 @@ public class SerializationTests
}
public string? LoadId(string id) => null;
public void Dispose() { }
}
private readonly Assembly _assembly = Assembly.GetExecutingAssembly();
@@ -103,6 +105,8 @@ public class SerializationTests
}
public string? LoadId(string id) => idToObject.GetValueOrDefault(id);
public void Dispose() { }
}
[Test]
@@ -154,7 +158,7 @@ public class SerializationTests
var fullName = _assembly.GetManifestResourceNames().Single(x => x.EndsWith(fileName));
var json = await ReadJson(fullName);
var closures = ReadAsObjects(json);
var process = new DeserializeProcess(null, new TestObjectLoader(closures), new ObjectDeserializerFactory());
using var process = new DeserializeProcess(null, new TestObjectLoader(closures), new ObjectDeserializerFactory());
await process.Deserialize("3416d3fe01c9196115514c4a2f41617b", default);
foreach (var (id, objJson) in closures)
{
@@ -251,7 +255,7 @@ public class SerializationTests
new DummyReceiveServerObjectManager(closure),
null
);
var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(true));
using var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(true));
var root = await process.Deserialize(rootId, default);
process.BaseCache.Count.ShouldBe(oldCount);
process.Total.ShouldBe(oldCount);
@@ -57,7 +57,7 @@ public class GeneralDeserializer : IDisposable
null
);
var o = new ObjectLoader(sqlite, serverObjects, null);
var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(skipCache));
using var process = new DeserializeProcess(null, o, new ObjectDeserializerFactory(), new(skipCache));
return await process.Deserialize(rootId, default).ConfigureAwait(false);
}
@@ -0,0 +1,91 @@
using Microsoft.Data.Sqlite;
using NUnit.Framework;
using Shouldly;
using Speckle.Sdk.Common;
using Speckle.Sdk.SQLite;
namespace Speckle.Sdk.Tests.Unit.SQLite;
[TestFixture]
public class SQLiteJsonCacheManagerTests
{
private readonly string _basePath = $"{Guid.NewGuid()}.db";
private string? _connectionString;
[SetUp]
public void Setup() => _connectionString = $"Data Source={_basePath};";
[TearDown]
public void TearDown()
{
if (File.Exists(_basePath))
{
SqliteConnection.ClearAllPools();
GC.Collect();
GC.WaitForPendingFinalizers();
File.Delete(_basePath);
}
}
[Test]
public void TestGetAll()
{
var data = new List<(string id, string json)>() { ("id1", "1"), ("id2", "2") };
using var manager = new SqLiteJsonCacheManager(_connectionString.NotNull(), 2);
manager.SaveObjects(data);
var items = manager.GetAllObjects();
items.Count.ShouldBe(data.Count);
var i = items.ToDictionary();
foreach (var (id, json) in data)
{
i.TryGetValue(id, out var j).ShouldBeTrue();
j.ShouldBe(json);
}
}
[Test]
public void TestGet()
{
var data = new List<(string id, string json)>() { ("id1", "1"), ("id2", "2") };
using var manager = new SqLiteJsonCacheManager(_connectionString.NotNull(), 2);
foreach (var d in data)
{
manager.SaveObject(d.id, d.json);
}
foreach (var d in data)
{
manager.SaveObject(d.id, d.json);
}
var items = manager.GetAllObjects();
items.Count.ShouldBe(data.Count);
var id1 = data[0].id;
var json1 = manager.GetObject(id1);
json1.ShouldBe(data[0].json);
manager.HasObject(id1).ShouldBeTrue();
manager.UpdateObject(id1, "3");
json1 = manager.GetObject(id1);
json1.ShouldBe("3");
manager.HasObject(id1).ShouldBeTrue();
manager.DeleteObject(id1);
json1 = manager.GetObject(id1);
json1.ShouldBeNull();
manager.HasObject(id1).ShouldBeFalse();
manager.UpdateObject(id1, "3");
json1 = manager.GetObject(id1);
json1.ShouldBe("3");
manager.HasObject(id1).ShouldBeTrue();
var id2 = data[1].id;
var json2 = manager.GetObject(id2);
json2.ShouldBe(data[1].json);
manager.HasObject(id2).ShouldBeTrue();
manager.DeleteObject(id2);
json2 = manager.GetObject(id2);
json2.ShouldBeNull();
manager.HasObject(id2).ShouldBeFalse();
}
}
@@ -1,5 +1,6 @@
using NUnit.Framework;
using NUnit.Framework;
using Shouldly;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Serialisation.V2.Send;
namespace Speckle.Sdk.Tests.Unit.Serialisation;
@@ -20,7 +21,7 @@ public class BatchTests
[Test]
public void TestBatchSize_Calc()
{
var batch = new Batch<BatchItem>(4);
using var batch = new Batch<BatchItem>();
batch.Add(new BatchItem(1));
batch.Size.ShouldBe(1);
batch.Add(new BatchItem(2));
@@ -30,12 +31,12 @@ public class BatchTests
[Test]
public void TestBatchSize_Trim()
{
var batch = new Batch<BatchItem>(4);
using var batch = new Batch<BatchItem>();
batch.Add(new BatchItem(1));
batch.Add(new BatchItem(2));
batch.Size.ShouldBe(3);
batch.Items.Capacity.ShouldBe(4);
batch.Items.Capacity.ShouldBe(Pools.DefaultCapacity);
batch.TrimExcess();
batch.Items.Capacity.ShouldBe(2);