diff --git a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs index ff425a33..82bdee14 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs @@ -50,6 +50,42 @@ public partial class Operations } } + /// + /// Sends a Speckle Object to the provided and (optionally) the default local cache + /// + /// + /// + /// When , an additional will be included + /// The or was + /// + /// using ServerTransport destination = new(account, streamId); + /// var (objectId, references) = await Send(mySpeckleObject, destination, true); + /// + public async Task<(string rootObjId, IReadOnlyDictionary convertedReferences)> Send( + Base value, + IServerTransport transport, + bool useDefaultCache, + IProgress? onProgressAction = null, + CancellationToken cancellationToken = default + ) + { + if (transport is null) + { + throw new ArgumentNullException(nameof(transport), "Expected a transport to be explicitly specified"); + } + + List transports = new() { transport }; + using SQLiteTransport2? localCache = useDefaultCache + ? new SQLiteTransport2(transport.StreamId) { TransportName = "LC2" } + : null; + if (localCache is not null) + { + transports.Add(localCache); + } + + return await Send(value, transports, onProgressAction, cancellationToken).ConfigureAwait(false); + } + /// /// Sends a Speckle Object to the provided and (optionally) the default local cache /// diff --git a/src/Speckle.Sdk/Serialisation/Utilities/SqlitePaths.cs b/src/Speckle.Sdk/Serialisation/Utilities/SqlitePaths.cs new file mode 100644 index 00000000..3d0b0f37 --- /dev/null +++ b/src/Speckle.Sdk/Serialisation/Utilities/SqlitePaths.cs @@ -0,0 +1,30 @@ +using Speckle.Sdk.Logging; +using Speckle.Sdk.Transports; + +namespace Speckle.Sdk.Serialisation.Utilities; + +public static class SqlitePaths +{ + private const string APPLICATION_NAME = "Speckle"; + private const string DATA_FOLDER = "Projects"; + private static readonly string basePath = SpecklePathProvider.UserApplicationDataPath(); + + public static string BlobStorageFolder => + SpecklePathProvider.BlobStoragePath(Path.Combine(basePath, APPLICATION_NAME)); + + public static string GetDBPath(string streamId) + { + var dir = Path.Combine(basePath, APPLICATION_NAME, DATA_FOLDER); + var db = Path.Combine(dir, $"{streamId}.db"); + try + { + Directory.CreateDirectory(dir); //ensure dir is there + return db; + } + catch (Exception ex) + when (ex is ArgumentException or IOException or UnauthorizedAccessException or NotSupportedException) + { + throw new TransportException($"Path was invalid or could not be created {db}", ex); + } + } +} diff --git a/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs b/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs index 801721c0..ad24cf82 100644 --- a/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs +++ b/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs @@ -1,33 +1,15 @@ using Microsoft.Data.Sqlite; -using Speckle.Sdk.Logging; -using Speckle.Sdk.Transports; +using Speckle.Sdk.Serialisation.Utilities; namespace Speckle.Sdk.Serialisation.V2; public abstract class SQLiteCacheManager { - private readonly string _rootPath; - private const string APPLICATION_NAME = "Speckle"; - private const string DATA_FOLDER = "Projects"; - protected SQLiteCacheManager(string streamId) { - var basePath = SpecklePathProvider.UserApplicationDataPath(); + string rootPath = SqlitePaths.GetDBPath(streamId); - try - { - var dir = Path.Combine(basePath, APPLICATION_NAME, DATA_FOLDER); - _rootPath = Path.Combine(dir, $"{streamId}.db"); - - Directory.CreateDirectory(dir); //ensure dir is there - } - catch (Exception ex) - when (ex is ArgumentException or IOException or UnauthorizedAccessException or NotSupportedException) - { - throw new TransportException($"Path was invalid or could not be created {_rootPath}", ex); - } - - ConnectionString = $"Data Source={_rootPath};"; + ConnectionString = $"Data Source={rootPath};"; Initialize(); } diff --git a/src/Speckle.Sdk/Transports/SQLiteTransport2.cs b/src/Speckle.Sdk/Transports/SQLiteTransport2.cs new file mode 100644 index 00000000..4dbaccd8 --- /dev/null +++ b/src/Speckle.Sdk/Transports/SQLiteTransport2.cs @@ -0,0 +1,408 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using System.Timers; +using Microsoft.Data.Sqlite; +using Speckle.Sdk.Logging; +using Speckle.Sdk.Models; +using Speckle.Sdk.Serialisation.Utilities; +using Timer = System.Timers.Timer; + +namespace Speckle.Sdk.Transports; + +public sealed class SQLiteTransport2 : IDisposable, ICloneable, ITransport, IBlobCapableTransport +{ + private readonly string _streamId; + private bool _isWriting; + private const int MAX_TRANSACTION_SIZE = 1000; + private const int POLL_INTERVAL = 500; + + private ConcurrentQueue<(string id, string serializedObject, int byteCount)> _queue = new(); + + /// + /// Timer that ensures queue is consumed if less than MAX_TRANSACTION_SIZE objects are being sent. + /// + private readonly Timer _writeTimer; + + public SQLiteTransport2(string streamId) + { + _streamId = streamId; + + _rootPath = SqlitePaths.GetDBPath(streamId); + + _connectionString = $"Data Source={_rootPath};"; + + Initialize(); + + _writeTimer = new Timer + { + AutoReset = true, + Enabled = false, + Interval = POLL_INTERVAL, + }; + _writeTimer.Elapsed += WriteTimerElapsed; + } + + private readonly string _rootPath; + + private readonly string _connectionString; + + private SqliteConnection Connection { get; set; } + private readonly SemaphoreSlim _connectionLock = new(1, 1); + + public string BlobStorageFolder => SqlitePaths.BlobStorageFolder; + + public void SaveBlob(Blob obj) + { + var blobPath = obj.originalPath; + var targetPath = obj.GetLocalDestinationPath(BlobStorageFolder); + File.Copy(blobPath, targetPath, true); + } + + public object Clone() + { + return new SQLiteTransport2(_streamId) + { + OnProgressAction = OnProgressAction, + CancellationToken = CancellationToken, + }; + } + + public void Dispose() + { + // TODO: Check if it's still writing? + Connection.Close(); + Connection.Dispose(); + _writeTimer.Dispose(); + _connectionLock.Dispose(); + } + + public string TransportName { get; set; } = "SQLite"; + + public Dictionary TransportContext => + new() + { + { "name", TransportName }, + { "type", GetType().Name }, + { "streamId", _streamId }, + { "blobStorageFolder", BlobStorageFolder }, + }; + + public CancellationToken CancellationToken { get; set; } + + public IProgress? OnProgressAction { get; set; } + + public int SavedObjectCount { get; private set; } + + public TimeSpan Elapsed { get; private set; } + + public void BeginWrite() + { + _queue = new(); + SavedObjectCount = 0; + } + + public void EndWrite() { } + + public Task> HasObjects(IReadOnlyList objectIds) + { + Dictionary ret = new(objectIds.Count); + // Initialize with false so that canceled queries still return a dictionary item for every object id + foreach (string objectId in objectIds) + { + ret[objectId] = false; + } + + try + { + const string COMMAND_TEXT = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1 "; + using var command = new SqliteCommand(COMMAND_TEXT, Connection); + + foreach (string objectId in objectIds) + { + CancellationToken.ThrowIfCancellationRequested(); + + command.Parameters.Clear(); + command.Parameters.AddWithValue("@hash", objectId); + + using var reader = command.ExecuteReader(); + bool rowFound = reader.Read(); + ret[objectId] = rowFound; + } + } + catch (SqliteException ex) + { + throw new TransportException("SQLite transport failed", ex); + } + + return Task.FromResult(ret); + } + + /// Failed to initialize connection to the SQLite DB + private void Initialize() + { + // NOTE: used for creating partioned object tables. + //string[] HexChars = new string[] { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" }; + //var cart = new List(); + //foreach (var str in HexChars) + // foreach (var str2 in HexChars) + // cart.Add(str + str2); + + using (var c = new SqliteConnection(_connectionString)) + { + c.Open(); + const string COMMAND_TEXT = + @" + CREATE TABLE IF NOT EXISTS objects( + hash TEXT PRIMARY KEY, + content TEXT + ) WITHOUT ROWID; + "; + using (var command = new SqliteCommand(COMMAND_TEXT, c)) + { + command.ExecuteNonQuery(); + } + + // Insert Optimisations + + using SqliteCommand cmd0 = new("PRAGMA journal_mode='wal';", c); + cmd0.ExecuteNonQuery(); + + //Note / Hack: This setting has the potential to corrupt the db. + //cmd = new SqliteCommand("PRAGMA synchronous=OFF;", Connection); + //cmd.ExecuteNonQuery(); + + using SqliteCommand cmd1 = new("PRAGMA count_changes=OFF;", c); + cmd1.ExecuteNonQuery(); + + using SqliteCommand cmd2 = new("PRAGMA temp_store=MEMORY;", c); + cmd2.ExecuteNonQuery(); + } + + Connection = new SqliteConnection(_connectionString); + Connection.Open(); + } + + /// + /// Returns all the objects in the store. Note: do not use for large collections. + /// + /// + /// This function uses a separate so is safe to call concurrently (unlike most other transport functions) + internal IEnumerable GetAllObjects() + { + CancellationToken.ThrowIfCancellationRequested(); + + using SqliteConnection connection = new(_connectionString); + connection.Open(); + + using var command = new SqliteCommand("SELECT * FROM objects", connection); + + using var reader = command.ExecuteReader(); + while (reader.Read()) + { + CancellationToken.ThrowIfCancellationRequested(); + yield return reader.GetString(1); + } + } + + /// + /// Deletes an object. Note: do not use for any speckle object transport, as it will corrupt the database. + /// + /// + public void DeleteObject(string hash) + { + CancellationToken.ThrowIfCancellationRequested(); + + using var command = new SqliteCommand("DELETE FROM objects WHERE hash = @hash", Connection); + command.Parameters.AddWithValue("@hash", hash); + command.ExecuteNonQuery(); + } + + /// + /// Updates an object. + /// + /// + /// + public void UpdateObject(string hash, string serializedObject) + { + CancellationToken.ThrowIfCancellationRequested(); + + using var c = new SqliteConnection(_connectionString); + c.Open(); + const string COMMAND_TEXT = "REPLACE INTO objects(hash, content) VALUES(@hash, @content)"; + using var command = new SqliteCommand(COMMAND_TEXT, c); + command.Parameters.AddWithValue("@hash", hash); + command.Parameters.AddWithValue("@content", serializedObject); + command.ExecuteNonQuery(); + } + + public override string ToString() + { + return $"Sqlite Transport @{_rootPath}"; + } + + #region Writes + + /// + /// Awaits untill write completion (ie, the current queue is fully consumed). + /// + /// + public async Task WriteComplete() => + await Utilities.WaitUntil(() => WriteCompletionStatus, 500).ConfigureAwait(false); + + /// + /// Returns true if the current write queue is empty and comitted. + /// + /// + public bool WriteCompletionStatus => _queue.IsEmpty && !_isWriting; + + private void WriteTimerElapsed(object? sender, ElapsedEventArgs e) + { + _writeTimer.Enabled = false; + + if (CancellationToken.IsCancellationRequested) + { + _queue = new ConcurrentQueue<(string, string, int)>(); + return; + } + + if (!_isWriting && !_queue.IsEmpty) + { + ConsumeQueue(); + } + } + + private void ConsumeQueue() + { + var stopwatch = Stopwatch.StartNew(); + _isWriting = true; + try + { + CancellationToken.ThrowIfCancellationRequested(); + + var i = 0; //BUG: This never gets incremented! + + var saved = 0; + + using (var c = new SqliteConnection(_connectionString)) + { + c.Open(); + using var t = c.BeginTransaction(); + const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)"; + + while (i < MAX_TRANSACTION_SIZE && _queue.TryPeek(out var result)) + { + using var command = new SqliteCommand(COMMAND_TEXT, c, t); + _queue.TryDequeue(out result); + command.Parameters.AddWithValue("@hash", result.id); + command.Parameters.AddWithValue("@content", result.serializedObject); + command.ExecuteNonQuery(); + + saved++; + } + + t.Commit(); + CancellationToken.ThrowIfCancellationRequested(); + } + + CancellationToken.ThrowIfCancellationRequested(); + + if (!_queue.IsEmpty) + { + ConsumeQueue(); + } + } + catch (SqliteException ex) + { + throw new TransportException(this, "SQLite Command Failed", ex); + } + catch (OperationCanceledException) + { + _queue = new(); + } + finally + { + stopwatch.Stop(); + Elapsed += stopwatch.Elapsed; + _isWriting = false; + } + } + + /// + /// Adds an object to the saving queue. + /// + /// + /// + public void SaveObject(string id, string serializedObject) + { + CancellationToken.ThrowIfCancellationRequested(); + _queue.Enqueue((id, serializedObject, Encoding.UTF8.GetByteCount(serializedObject))); + + _writeTimer.Enabled = true; + _writeTimer.Start(); + } + + /// + /// Directly saves the object in the db. + /// + /// + /// + public void SaveObjectSync(string hash, string serializedObject) + { + const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)"; + + try + { + using var command = new SqliteCommand(COMMAND_TEXT, Connection); + command.Parameters.AddWithValue("@hash", hash); + command.Parameters.AddWithValue("@content", serializedObject); + command.ExecuteNonQuery(); + } + catch (SqliteException ex) + { + throw new TransportException(this, "SQLite Command Failed", ex); + } + } + + #endregion + + #region Reads + + /// + /// Gets an object. + /// + /// + /// + public async Task GetObject(string id) + { + CancellationToken.ThrowIfCancellationRequested(); + await _connectionLock.WaitAsync(CancellationToken).ConfigureAwait(false); + var startTime = Stopwatch.GetTimestamp(); + try + { + using var command = new SqliteCommand("SELECT * FROM objects WHERE hash = @hash LIMIT 1 ", Connection); + command.Parameters.AddWithValue("@hash", id); + using var reader = command.ExecuteReader(); + if (reader.Read()) + { + return reader.GetString(1); + } + } + finally + { + Elapsed += LoggingHelpers.GetElapsedTime(startTime, Stopwatch.GetTimestamp()); + _connectionLock.Release(); + } + return null; // pass on the duty of null checks to consumers + } + + public async Task CopyObjectAndChildren(string id, ITransport targetTransport) + { + string res = await TransportHelpers + .CopyObjectAndChildrenAsync(id, this, targetTransport, CancellationToken) + .ConfigureAwait(false); + return res; + } + + #endregion +} diff --git a/tests/Speckle.Sdk.Tests.Unit/Transports/SQLiteTransport2Tests.cs b/tests/Speckle.Sdk.Tests.Unit/Transports/SQLiteTransport2Tests.cs new file mode 100644 index 00000000..737fbfac --- /dev/null +++ b/tests/Speckle.Sdk.Tests.Unit/Transports/SQLiteTransport2Tests.cs @@ -0,0 +1,164 @@ +using Microsoft.Data.Sqlite; +using NUnit.Framework; +using Speckle.Sdk.Common; +using Speckle.Sdk.Serialisation.Utilities; +using Speckle.Sdk.Transports; + +namespace Speckle.Sdk.Tests.Unit.Transports; + +[TestFixture] +[TestOf(nameof(SQLiteTransport2))] +public sealed class SQLiteTransport2Tests : TransportTests, IDisposable +{ + protected override ITransport? Sut => _sqlite; + + private SQLiteTransport2? _sqlite; + + private static readonly string s_name = $"test-{Guid.NewGuid()}"; + private static readonly string s_basePath = SqlitePaths.GetDBPath(s_name); + + [SetUp] + public void Setup() + { + _sqlite = new SQLiteTransport2(s_name); + } + + [TearDown] + public void TearDown() + { + _sqlite?.Dispose(); + SqliteConnection.ClearAllPools(); + File.Delete(s_basePath); + _sqlite = null; + } + + [Test] + public void DbCreated_AfterInitialization() + { + bool fileExists = File.Exists(s_basePath); + Assert.That(fileExists, Is.True); + } + + [Test] + [Description("Tests that an object can be updated")] + public async Task UpdateObject_AfterAdd() + { + const string PAYLOAD_ID = "MyTestObjectId"; + const string PAYLOAD_DATA = "MyTestObjectData"; + + _sqlite.NotNull().SaveObject(PAYLOAD_ID, PAYLOAD_DATA); + await _sqlite.WriteComplete(); + + const string NEW_PAYLOAD = "MyEvenBetterObjectData"; + _sqlite.UpdateObject(PAYLOAD_ID, NEW_PAYLOAD); + await _sqlite.WriteComplete(); + + var result = await _sqlite.GetObject(PAYLOAD_ID); + Assert.That(result, Is.EqualTo(NEW_PAYLOAD)); + } + + [Test] + [Description("Tests that updating an object that hasn't been saved previously adds the object to the DB")] + public async Task UpdateObject_WhenMissing() + { + const string PAYLOAD_ID = "MyTestObjectId"; + const string PAYLOAD_DATA = "MyTestObjectData"; + + var preUpdate = await _sqlite.NotNull().GetObject(PAYLOAD_ID); + Assert.That(preUpdate, Is.Null); + + _sqlite.UpdateObject(PAYLOAD_ID, PAYLOAD_DATA); + await _sqlite.WriteComplete(); + + var postUpdate = await _sqlite.GetObject(PAYLOAD_ID); + Assert.That(postUpdate, Is.EqualTo(PAYLOAD_DATA)); + } + + [Test] + public async Task SaveAndRetrieveObject_Sync() + { + const string PAYLOAD_ID = "MyTestObjectId"; + const string PAYLOAD_DATA = "MyTestObjectData"; + + var preAdd = await Sut.NotNull().GetObject(PAYLOAD_ID); + Assert.That(preAdd, Is.Null); + + _sqlite.NotNull().SaveObjectSync(PAYLOAD_ID, PAYLOAD_DATA); + + { + var postAdd = await Sut.GetObject(PAYLOAD_ID); + Assert.That(postAdd, Is.EqualTo(PAYLOAD_DATA)); + } + } + + [Test( + Description = "Tests that it is possible to enumerate through all objects of the transport while updating them, without getting stuck in an infinite loop" + )] + [Timeout(1000)] + public void UpdateObject_WhileEnumerating() + { + //I question if this is the behaviour we want, but AccountManager.GetObjects is relying on being able to update objects while enumerating over them + const string UPDATE_STRING = "_new"; + Dictionary testData = + new() + { + { "a", "This is object a" }, + { "b", "This is object b" }, + { "c", "This is object c" }, + { "d", "This is object d" }, + }; + int length = testData.Values.First().Length; + + foreach (var (key, data) in testData) + { + _sqlite.NotNull().SaveObjectSync(key, data); + } + + foreach (var o in _sqlite.NotNull().GetAllObjects()) + { + string newData = o + UPDATE_STRING; + string key = $"{o[length - 1]}"; + + _sqlite.UpdateObject(key, newData); + } + + //Assert that objects were updated + Assert.That(_sqlite.GetAllObjects().ToList(), Has.All.Contains(UPDATE_STRING)); + //Assert that objects were only updated once + Assert.That(_sqlite.GetAllObjects().ToList(), Has.All.Length.EqualTo(length + UPDATE_STRING.Length)); + } + + [Test] + [Repeat(10)] + [TestCase(6, 32)] + [Description( + $"Tests that the {nameof(SQLiteTransport2.GetAllObjects)} function can be called concurrently from multiple threads" + )] + public void GetAllObjects_IsThreadSafe(int dataSize, int parallelism) + { + foreach (int i in Enumerable.Range(0, dataSize)) + { + _sqlite.NotNull().SaveObjectSync(i.ToString(), Guid.NewGuid().ToString()); + } + + List[] results = new List[parallelism]; + Parallel.ForEach( + Enumerable.Range(0, parallelism), + i => + { + results[i] = _sqlite.NotNull().GetAllObjects().ToList(); + } + ); + + foreach (var result in results) + { + Assert.That(result, Is.EquivalentTo(results[0])); + Assert.That(result, Has.Count.EqualTo(dataSize)); + } + } + + public void Dispose() + { + _sqlite?.Dispose(); + } +}