Pool object savers instead of sqlite

This commit is contained in:
Adam Hathcock
2025-06-10 11:15:01 +01:00
parent 44ba61e4a5
commit 7f8b59d348
11 changed files with 66 additions and 11 deletions
@@ -12,16 +12,19 @@ public partial interface ISqLiteJsonCacheManager : IDisposable;
public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
{
private readonly CacheDbCommandPool _pool;
public string Path {get;}
public static ISqLiteJsonCacheManager FromMemory(int concurrency) => new SqLiteJsonCacheManager(concurrency);
private SqLiteJsonCacheManager(int concurrency)
{
Path = ":memory:";
//disable pooling as we pool ourselves
var builder = new SqliteConnectionStringBuilder
{
Pooling = false,
DataSource = ":memory:",
DataSource = Path,
Cache = SqliteCacheMode.Shared,
Mode = SqliteOpenMode.Memory,
};
@@ -34,8 +37,9 @@ public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
private SqLiteJsonCacheManager(string path, int concurrency)
{
Path = path;
//disable pooling as we pool ourselves
var builder = new SqliteConnectionStringBuilder { Pooling = false, DataSource = path };
var builder = new SqliteConnectionStringBuilder { Pooling = false, DataSource = Path };
_pool = new CacheDbCommandPool(builder.ToString(), concurrency);
Initialize();
}
@@ -7,6 +7,10 @@ namespace Speckle.Sdk.Serialisation.V2;
public class MemoryJsonCacheManager(ConcurrentDictionary<Id, Json> jsonCache) : ISqLiteJsonCacheManager
#pragma warning restore CA1063
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() =>
jsonCache.Select(x => (x.Key.Value, x.Value.Value)).ToList();
@@ -0,0 +1,36 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Serialisation.V2.Send;
public partial interface IObjectSaverFactory : IDisposable;
[GenerateAutoInterface]
public sealed class ObjectSaverFactory( IServerObjectManager serverObjectManager, ILoggerFactory loggerFactory) : IObjectSaverFactory
{private readonly ConcurrentDictionary<string, IObjectSaver> _savers = new();
public IObjectSaver Create(
ISqLiteJsonCacheManager sqLiteJsonCacheManager, IProgress<ProgressArgs>? progress, CancellationToken cancellationToken,
SerializeProcessOptions? options = null)
{
if (!_savers.TryGetValue(sqLiteJsonCacheManager.Path, out var saver))
{
saver = new ObjectSaver(progress,sqLiteJsonCacheManager, serverObjectManager, loggerFactory.CreateLogger<ObjectSaver>(),
cancellationToken, options);
_savers.TryAdd(sqLiteJsonCacheManager.Path, saver);
}
return saver;
}
[AutoInterfaceIgnore]
public void Dispose()
{
foreach (var pool in _savers)
{
pool.Value.Dispose();
}
_savers.Clear();
}
}
@@ -86,7 +86,6 @@ public sealed class SerializeProcess(
await WaitForSchedulerCompletion().ConfigureAwait(false);
await _highest.DisposeAsync().ConfigureAwait(false);
await _belowNormal.DisposeAsync().ConfigureAwait(false);
objectSaver.Dispose();
_processSource.Dispose();
}
@@ -13,6 +13,7 @@ public class SerializeProcessFactory(
IObjectSerializerFactory objectSerializerFactory,
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory,
IServerObjectManagerFactory serverObjectManagerFactory,
IObjectSaverFactory objectSaverFactory,
ILoggerFactory loggerFactory
) : ISerializeProcessFactory
{
@@ -27,7 +28,7 @@ public class SerializeProcessFactory(
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);
return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken, options);
return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken, options);
}
public ISerializeProcess CreateSerializeProcess(
@@ -39,13 +40,7 @@ public class SerializeProcessFactory(
) =>
new SerializeProcess(
progress,
new ObjectSaver(
progress,
sqLiteJsonCacheManager,
serverObjectManager,
loggerFactory.CreateLogger<ObjectSaver>(),
cancellationToken
),
objectSaverFactory.Create(sqLiteJsonCacheManager, progress, cancellationToken, options),
baseChildFinder,
new BaseSerializer(sqLiteJsonCacheManager, objectSerializerFactory),
loggerFactory,
+2
View File
@@ -97,6 +97,8 @@ public static class ServiceRegistration
typeof(Client)
);
serviceCollection.AddMatchingInterfacesAsTransient(typeof(GraphQLRetry).Assembly);
//we want to make object savers be singletons per stream so needs a singleton factory
serviceCollection.AddSingleton<IObjectSaverFactory, ObjectSaverFactory>();
return serviceCollection;
}
@@ -373,6 +373,9 @@ public class DummyServerObjectManager : IServerObjectManager
public class DummySendCacheManager(Dictionary<string, string> objects) : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
public void Dispose() { }
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
@@ -4,6 +4,9 @@ namespace Speckle.Sdk.Serialization.Tests;
public class DummyCancellationSqLiteSendManager : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
public string? GetObject(string id) => null;
public void SaveObject(string id, string json) => throw new NotImplementedException();
@@ -4,6 +4,9 @@ namespace Speckle.Sdk.Serialization.Tests.Framework;
public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
private readonly object _lock = new();
private int _count;
@@ -5,6 +5,9 @@ namespace Speckle.Sdk.Testing.Framework;
public sealed class DummySqLiteReceiveManager(IReadOnlyDictionary<string, string> savedObjects)
: ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
public void Dispose() { }
public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
@@ -4,6 +4,9 @@ namespace Speckle.Sdk.Testing.Framework;
public class DummySqLiteSendManager : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public string Path => throw new NotImplementedException();
#pragma warning restore CA1065
public string? GetObject(string id) => throw new NotImplementedException();
public void SaveObject(string id, string json) => throw new NotImplementedException();