add usage of a command pool

This commit is contained in:
Adam Hathcock
2024-12-20 09:26:36 +00:00
parent 63db2cb06e
commit 1b84c6cea7
3 changed files with 230 additions and 82 deletions
@@ -0,0 +1,103 @@
using System.Collections.Concurrent;
using Microsoft.Data.Sqlite;
namespace Speckle.Sdk.SQLite;
//inspired by https://github.com/neosmart/SqliteCache/blob/master/SqliteCache/DbCommandPool.cs
public sealed class CacheDbCommandPool : IDisposable
{
private const int INITIAL_CONCURRENCY = 4;
private readonly ConcurrentBag<SqliteCommand>[] _commands = new ConcurrentBag<SqliteCommand>[CacheDbCommands.Count];
private readonly ConcurrentBag<SqliteConnection> _connections = new();
private readonly string _connectionString;
public CacheDbCommandPool(string connectionString)
{
_connectionString = connectionString;
for (int i = 0; i < _commands.Length; ++i)
{
_commands[i] = new ConcurrentBag<SqliteCommand>();
}
for (int i = 0; i < INITIAL_CONCURRENCY; ++i)
{
var connection = new SqliteConnection(_connectionString);
connection.Open();
_connections.Add(connection);
}
}
public void Use(CacheOperation type, Action<SqliteCommand> handler)
{
Use<bool>(
type,
(cmd) =>
{
handler(cmd);
return true;
}
);
}
public T Use<T>(Func<SqliteConnection, T> handler)
{
if (!_connections.TryTake(out var db))
{
db = new SqliteConnection(_connectionString);
db.Open();
}
try
{
return handler(db);
}
finally
{
_connections.Add(db);
}
}
public T Use<T>(CacheOperation type, Func<SqliteCommand, T> handler)
{
return Use(
(conn) =>
{
var pool = _commands[(int)type];
if (!pool.TryTake(out var command))
{
#pragma warning disable CA2100
command = new SqliteCommand(CacheDbCommands.Commands[(int)type], conn);
#pragma warning restore CA2100
}
try
{
command.Connection = conn;
return handler(command);
}
finally
{
command.Connection = null;
command.Parameters.Clear();
pool.Add(command);
}
}
);
}
public void Dispose()
{
foreach (var pool in _commands)
{
while (pool.TryTake(out var cmd))
{
cmd.Dispose();
}
}
foreach (var conn in _connections)
{
conn.Close();
conn.Dispose();
}
}
}
+35
View File
@@ -0,0 +1,35 @@
namespace Speckle.Sdk.SQLite;
public enum CacheOperation
{
InsertOrIgnore,
InsertOrReplace,
Has,
Get,
Delete,
GetAll,
BulkInsertOrIgnore,
}
public static class CacheDbCommands
{
public static readonly string[] Commands;
public static readonly int Count = Enum.GetValues(typeof(CacheOperation)).Length;
#pragma warning disable CA1810
static CacheDbCommands()
#pragma warning restore CA1810
{
Commands = new string[Count];
Commands[(int)CacheOperation.InsertOrIgnore] =
"INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
Commands[(int)CacheOperation.InsertOrReplace] = "REPLACE INTO objects(hash, content) VALUES(@hash, @content)";
Commands[(int)CacheOperation.Has] = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Get] = "SELECT * FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Delete] = "DELETE FROM objects WHERE hash = @hash";
Commands[(int)CacheOperation.GetAll] = "SELECT * FROM objects";
Commands[(int)CacheOperation.BulkInsertOrIgnore] = "INSERT OR IGNORE INTO objects (hash, content) VALUES ";
}
}
@@ -1,19 +1,24 @@
using System.Text;
using Microsoft.Data.Sqlite;
using Speckle.InterfaceGenerator;
namespace Speckle.Sdk.SQLite;
[GenerateAutoInterface]
public class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager, IDisposable
{
private readonly string _connectionString;
private readonly CacheDbCommandPool _pool;
public SqLiteJsonCacheManager(string rootPath)
{
_connectionString = $"Data Source={rootPath};";
Initialize();
_pool = new CacheDbCommandPool(_connectionString);
}
public void Dispose() => _pool.Dispose();
private void Initialize()
{
// NOTE: used for creating partioned object tables.
@@ -59,98 +64,103 @@ public class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
cmd4.ExecuteNonQuery();
}
public IEnumerable<string> GetAllObjects()
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("SELECT * FROM objects", c);
public IReadOnlyCollection<string> GetAllObjects() =>
_pool.Use(
CacheOperation.GetAll,
command =>
{
var list = new HashSet<string>();
using var reader = command.ExecuteReader();
while (reader.Read())
{
list.Add(reader.GetString(1));
}
return list;
}
);
using var reader = command.ExecuteReader();
while (reader.Read())
{
yield return reader.GetString(1);
}
}
public void DeleteObject(string id) =>
_pool.Use(
CacheOperation.Delete,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.ExecuteNonQuery();
}
);
public void DeleteObject(string id)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("DELETE FROM objects WHERE hash = @hash", c);
command.Parameters.AddWithValue("@hash", id);
command.ExecuteNonQuery();
}
public string? GetObject(string id)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var command = new SqliteCommand("SELECT * FROM objects WHERE hash = @hash LIMIT 1 ", c);
command.Parameters.AddWithValue("@hash", id);
using var reader = command.ExecuteReader();
if (reader.Read())
{
return reader.GetString(1);
}
return null; // pass on the duty of null checks to consumers
}
public string? GetObject(string id) =>
_pool.Use(
CacheOperation.Get,
command =>
{
command.Parameters.AddWithValue("@hash", id);
return (string?)command.ExecuteScalar();
}
);
//This does an insert or ignores if already exists
public void SaveObject(string id, string json)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
public void SaveObject(string id, string json) =>
_pool.Use(
CacheOperation.InsertOrIgnore,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
);
//This does an insert or replaces if already exists
public void UpdateObject(string id, string json)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "REPLACE INTO objects(hash, content) VALUES(@hash, @content)";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
public void UpdateObject(string id, string json) =>
_pool.Use(
CacheOperation.InsertOrReplace,
command =>
{
command.Parameters.AddWithValue("@hash", id);
command.Parameters.AddWithValue("@content", json);
command.ExecuteNonQuery();
}
);
public void SaveObjects(IEnumerable<(string id, string json)> items)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
using var t = c.BeginTransaction();
const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)";
public void SaveObjects(IEnumerable<(string id, string json)> items) =>
_pool.Use(
CacheOperation.BulkInsertOrIgnore,
cmd =>
{
CreateBulkInsert(cmd, items);
return cmd.ExecuteNonQuery();
}
);
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Transaction = t;
var idParam = command.Parameters.Add("@hash", SqliteType.Text);
var jsonParam = command.Parameters.Add("@content", SqliteType.Text);
private void CreateBulkInsert(SqliteCommand cmd, IEnumerable<(string id, string json)> items)
{
StringBuilder sb = new();
sb.AppendLine(CacheDbCommands.Commands[(int)CacheOperation.BulkInsertOrIgnore]);
int i = 0;
foreach (var (id, json) in items)
{
idParam.Value = id;
jsonParam.Value = json;
command.ExecuteNonQuery();
sb.Append($"(@key{i}, @value{i},");
cmd.Parameters.AddWithValue($"@key{i}", id);
cmd.Parameters.AddWithValue($"@value{i}", json);
i++;
}
t.Commit();
sb.Remove(sb.Length - 1, 1);
sb.Append(';');
#pragma warning disable CA2100
cmd.CommandText = sb.ToString();
#pragma warning restore CA2100
}
public bool HasObject(string objectId)
{
using var c = new SqliteConnection(_connectionString);
c.Open();
const string COMMAND_TEXT = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1 ";
using var command = new SqliteCommand(COMMAND_TEXT, c);
command.Parameters.AddWithValue("@hash", objectId);
using var reader = command.ExecuteReader();
bool rowFound = reader.Read();
return rowFound;
}
public bool HasObject(string objectId) =>
_pool.Use(
CacheOperation.Has,
command =>
{
command.Parameters.AddWithValue("@hash", objectId);
using var reader = command.ExecuteReader();
bool rowFound = reader.Read();
return rowFound;
}
);
}