Compare commits

..

2 Commits

Author SHA1 Message Date
Jedd Morgan a560f7f159 more blobs poc 2025-11-24 18:28:50 +00:00
JR-Morgan 94d2a01880 blobs-poc 2025-11-13 11:01:26 +03:00
29 changed files with 417 additions and 589 deletions
@@ -0,0 +1,41 @@
using System.Threading.Channels;
namespace Speckle.Sdk.Dependencies;
internal sealed class BroadcastChannel<T>
{
private readonly List<Channel<T>> _subscribers = [];
public ChannelReader<T> Subscribe()
{
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions() { SingleReader = true });
_subscribers.Add(channel);
return channel.Reader;
}
public async Task WriteAsync(T item, CancellationToken cancellationToken)
{
foreach (var sub in _subscribers)
{
await sub.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
public bool IsReadingCompleted()
{
return _subscribers.All(x => x.Reader.Completion.IsCompleted);
}
public void CompleteWriters()
{
foreach (var sub in _subscribers)
{
sub.Writer.Complete();
}
}
public async Task CompleteReaders()
{
await Task.WhenAll(_subscribers.Select(x => x.Reader.Completion)).ConfigureAwait(false);
}
}
@@ -6,28 +6,23 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed class Batch<T> : IMemoryOwner<T>
where T : IHasByteSize
{
private static readonly Pool<List<T>> _pool = Pools.CreateListPool<T>();
#pragma warning disable IDE0032
private readonly List<T> _items = _pool.Get();
private int _batchByteSize;
#pragma warning restore IDE0032
private static readonly Pool<List<T>> s_pool = Pools.CreateListPool<T>();
public List<T> Items { get; } = s_pool.Get();
public int BatchByteSize { get; private set; }
public void Add(T item)
{
_items.Add(item);
_batchByteSize += item.ByteSize;
Items.Add(item);
BatchByteSize += item.ByteSize;
}
public void TrimExcess()
{
_items.TrimExcess();
_batchByteSize = _items.Sum(x => x.ByteSize);
Items.TrimExcess();
BatchByteSize = Items.Sum(x => x.ByteSize);
}
public int BatchByteSize => _batchByteSize;
public List<T> Items => _items;
public void Dispose() => s_pool.Return(Items);
public void Dispose() => _pool.Return(_items);
public Memory<T> Memory => new(_items.ToArray());
public Memory<T> Memory => new(Items.ToArray());
}
@@ -1,74 +1,134 @@
using System.Buffers;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;
namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelSaver<T>
where T : IHasByteSize
public abstract class ChannelSaver<TItem, TBlobItem>
where TItem : IHasByteSize
where TBlobItem : IHasByteSize, TItem
{
private const int SEND_CAPACITY = 10000;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private const int BLOB_SEND_CHUNK_SIZE = 10; //count
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4;
private const int HTTP_CAPACITY = 500;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 1000;
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
AllowSynchronousContinuations = true,
Capacity = SEND_CAPACITY,
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait,
},
_ => throw new NotImplementedException("Dropping items not supported.")
);
private readonly BroadcastChannel<TItem> _broadcastChannel = new();
public Task Start(
public async Task Start(
int? maxParallelism,
int? httpBatchSize,
int? blobSendCache,
int? cacheBatchSize,
CancellationToken cancellationToken
) =>
_checkCacheChannel
.Reader.BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY,
false,
)
{
maxParallelism ??= MAX_PARALLELISM_HTTP;
httpBatchSize ??= HTTP_SEND_CHUNK_SIZE;
blobSendCache ??= BLOB_SEND_CHUNK_SIZE;
cacheBatchSize ??= MAX_CACHE_BATCH;
await StartInternal(
maxParallelism.Value,
httpBatchSize.Value,
blobSendCache.Value,
cacheBatchSize.Value,
cancellationToken
)
.Join()
.Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith(
t =>
{
Exception? ex = t.Exception;
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
{
ex = new OperationCanceledException();
}
.ConfigureAwait(false);
}
if (ex is not null)
{
RecordException(ex);
}
_checkCacheChannel.Writer.TryComplete(ex);
},
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Current
private Task StartInternal(
int maxParallelism,
int httpBatchSize,
int blobSendCache,
int cacheBatchSize,
CancellationToken cancellationToken
)
{
Task serverSend = _broadcastChannel
.Subscribe()
.BatchByByteSize(httpBatchSize)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrentlyAsync(
maxParallelism,
async x => await SendToServer(x).ConfigureAwait(false),
cancellationToken
);
public async Task SaveAsync(T item, CancellationToken cancellationToken)
Task writeCache = _broadcastChannel
.Subscribe()
.Batch(cacheBatchSize)
.ReadAll(SaveToCache, true, cancellationToken: cancellationToken)
.AsTask();
Task blobsCache = _broadcastChannel
.Subscribe()
.OfType<TItem, TBlobItem>()
.BatchByByteSize(blobSendCache)
.ReadAllAsync(
async x => await SendBlobToServer(x).ConfigureAwait(false),
true,
cancellationToken: cancellationToken
)
.AsTask();
return Task.WhenAll(serverSend, writeCache, blobsCache);
// return _broadcastChannel
// .Subscribe()
// .BatchByByteSize(httpBatchSize ?? HTTP_SEND_CHUNK_SIZE)
// .WithTimeout(HTTP_BATCH_TIMEOUT)
// .PipeAsync(
// maxParallelism ?? MAX_PARALLELISM_HTTP,
// async x => await SendToServer(x).ConfigureAwait(false),
// HTTP_CAPACITY,
// false,
// cancellationToken
// )
// .Join()
// .Batch(cacheBatchSize ?? MAX_CACHE_BATCH, singleReader: true)
// .WithTimeout(HTTP_BATCH_TIMEOUT)
// .ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
// .ContinueWith(
// t =>
// {
// Exception? ex = t.Exception;
// if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
// {
// ex = new OperationCanceledException();
// }
//
// if (ex is not null)
// {
// RecordException(ex);
// }
//
// _checkCacheChannel.Writer.TryComplete(ex);
// },
// cancellationToken,
// TaskContinuationOptions.ExecuteSynchronously,
// TaskScheduler.Current
// );
}
private async ValueTask SendBlobToServer(IMemoryOwner<TBlobItem> batch)
{
try
{
await SendBlobToServerInternal((Batch<TBlobItem>)batch).ConfigureAwait(false);
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
RecordException(ex);
}
}
protected abstract Task SendBlobToServerInternal(Batch<TBlobItem> batch);
public async Task SaveAsync(TItem item, CancellationToken cancellationToken)
{
if (Exception is not null)
{
@@ -76,36 +136,34 @@ public abstract class ChannelSaver<T>
}
//can switch to check then try pattern when back pressure is needed or exceptions are too much
//the trees don't need to respond to back pressure
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
await _broadcastChannel.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
private async Task SendToServer(IMemoryOwner<TItem> batch)
{
try
{
await SendToServerInternal((Batch<T>)batch).ConfigureAwait(false);
return batch;
await SendToServerInternal((Batch<TItem>)batch).ConfigureAwait(false);
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
RecordException(ex);
return batch;
}
}
protected abstract Task SendToServerInternal(Batch<T> batch);
protected abstract Task SendToServerInternal(Batch<TItem> batch);
public abstract void SaveToCache(List<T> item);
public abstract void SaveToCache(List<TItem> item);
public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();
public void DoneTraversing() => _broadcastChannel.CompleteWriters();
public async Task DoneSaving()
{
if (!_checkCacheChannel.Reader.Completion.IsCompleted)
if (!_broadcastChannel.IsReadingCompleted())
{
await _checkCacheChannel.Reader.Completion.ConfigureAwait(false);
await _broadcastChannel.CompleteReaders().ConfigureAwait(false);
}
}
@@ -114,6 +172,5 @@ public abstract class ChannelSaver<T>
private void RecordException(Exception ex)
{
Exception = ex;
_checkCacheChannel.Writer.TryComplete(ex);
}
}
-2
View File
@@ -35,7 +35,6 @@ public sealed class Client : ISpeckleGraphQLClient, IClient
public WorkspaceResource Workspace { get; }
public ServerResource Server { get; }
public FileImportResource FileImport { get; }
public IngestResource Ingest { get; }
public Uri ServerUrl => new(Account.serverInfo.url);
@@ -72,7 +71,6 @@ public sealed class Client : ISpeckleGraphQLClient, IClient
Workspace = new(this);
Server = new(this);
FileImport = new(this, blobApiFactory.Create(account));
Ingest = new(this);
}
[AutoInterfaceIgnore]
@@ -1,19 +0,0 @@
namespace Speckle.Sdk.Api.GraphQL.Inputs;
public record IngestCreateInput(
string fileName,
int? maxIdleTimeoutMinutes,
string modelId,
string projectId,
string sourceApplication,
string sourceApplicationVersion,
IReadOnlyDictionary<string, object?> sourceFileData
);
public record IngestFinishInput(string id, string? message, string objectId, string projectId);
public record IngestErrorInput(string errorReason, string errorStacktrace, string id, string projectId);
public record CancelRequestInput(string id, string projectId);
public record IngestUpdateInput(string id, double? progress, string? progressMessage, string projectId);
@@ -1,23 +0,0 @@
namespace Speckle.Sdk.Api.GraphQL.Models;
public sealed class Ingest
{
public required DateTime createdAt { get; init; }
public required string errorReason { get; init; }
public required string errorStacktrace { get; init; }
public required string fileName { get; init; }
public required string id { get; init; }
public required long maxIdleTimeoutMinutes { get; init; }
public required string modelId { get; init; }
public required Dictionary<string, object?> performanceData { get; init; }
public required double progress { get; init; }
public required string? progressMessage { get; init; }
public required string projectId { get; init; }
public required string sourceApplication { get; init; }
public required string sourceApplicationVersion { get; init; }
public required Dictionary<string, object?> sourceFileData { get; init; }
public required string status { get; init; }
public required DateTime updatedAt { get; init; }
public required string versionId { get; init; }
public required LimitedUser user { get; init; }
}
@@ -1,253 +0,0 @@
using GraphQL;
using Speckle.Sdk.Api.GraphQL.Inputs;
using Speckle.Sdk.Api.GraphQL.Models;
using Speckle.Sdk.Api.GraphQL.Models.Responses;
using Version = Speckle.Sdk.Api.GraphQL.Models.Version;
namespace Speckle.Sdk.Api.GraphQL.Resources;
public sealed class IngestResource
{
private readonly ISpeckleGraphQLClient _client;
internal IngestResource(ISpeckleGraphQLClient client)
{
_client = client;
}
/// <param name="modelId"></param>
/// <param name="projectId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<ResourceCollection<Ingest>> GetIngests(
string modelId,
string projectId,
CancellationToken cancellationToken = default
)
{
//language=graphql
const string QUERY = """
query GetIngest($modelId: String!, $projectId: String!) {
data:project(id: $projectId) {
data:model(id: $modelId) {
data:ingests {
cursor
items {
createdAt
errorReason
errorStacktrace
fileName
id
maxIdleTimeoutMinutes
modelId
performanceData
progress
progressMessage
projectId
sourceApplication
sourceApplicationVersion
sourceFileData
status
updatedAt
versionId
user {
avatar
bio
company
id
name
role
verified
}
}
}
}
}
}
""";
var request = new GraphQLRequest { Query = QUERY, Variables = new { modelId, projectId } };
var response = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<ResourceCollection<Ingest>>>>>(
request,
cancellationToken
)
.ConfigureAwait(false);
return response.data.data.data;
}
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<bool> Update(IngestUpdateInput input, CancellationToken cancellationToken = default)
{
//language=graphql
const string QUERY = """
mutation IngestUpdate($projectId: ID!, $input: IngestUpdateInput!) {
data: projectMutations {
data: ingestMutations(projectId: $projectId) {
data: update(input: $input)
}
}
}
""";
GraphQLRequest request = new() { Query = QUERY, Variables = new { input, input.projectId } };
var res = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<bool>>>>(request, cancellationToken)
.ConfigureAwait(false);
return res.data.data.data;
}
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<Ingest> Create(IngestCreateInput input, CancellationToken cancellationToken = default)
{
//language=graphql
const string QUERY = """
mutation IngestCreate($projectId: ID!, $input: IngestCreateInput!) {
data: projectMutations {
data:ingestMutations(projectId: $projectId) {
data:create(input: $input) {
createdAt
errorReason
errorStacktrace
fileName
id
maxIdleTimeoutMinutes
modelId
performanceData
progress
progressMessage
projectId
sourceApplication
sourceApplicationVersion
sourceFileData
status
updatedAt
versionId
user {
avatar
bio
company
id
name
role
verified
}
}
}
}
}
""";
GraphQLRequest request = new() { Query = QUERY, Variables = new { input, input.projectId } };
var res = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<Ingest>>>>(request, cancellationToken)
.ConfigureAwait(false);
return res.data.data.data;
}
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<Version> End(IngestFinishInput input, CancellationToken cancellationToken = default)
{
//language=graphql
const string QUERY = """
mutation IngestEnd($projectId: ID!, $input: IngestFinishInput!) {
data: projectMutations {
data:ingestMutations(projectId: $projectId) {
data:end(input: $input) {
id
referencedObject
message
sourceApplication
createdAt
previewUrl
authorUser {
id
name
bio
company
verified
role
avatar
}
}
}
}
}
""";
GraphQLRequest request = new() { Query = QUERY, Variables = new { input, input.projectId } };
var res = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<Version>>>>(request, cancellationToken)
.ConfigureAwait(false);
return res.data.data.data;
}
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<bool> Error(IngestErrorInput input, CancellationToken cancellationToken = default)
{
//language=graphql
const string QUERY = """
mutation IngestError($projectId: ID!, $input: IngestErrorInput!) {
data: projectMutations {
data:ingestMutations(projectId: $projectId) {
data:error(input: $input)
}
}
}
""";
GraphQLRequest request = new() { Query = QUERY, Variables = new { input, input.projectId } };
var res = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<bool>>>>(request, cancellationToken)
.ConfigureAwait(false);
return res.data.data.data;
}
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <inheritdoc cref="ISpeckleGraphQLClient.ExecuteGraphQLRequest{T}"/>
public async Task<bool> Cancel(CancelRequestInput input, CancellationToken cancellationToken = default)
{
//language=graphql
const string QUERY = """
mutation IngestCancel($projectId: ID!, $input: CancelRequestInput!) {
data:projectMutations {
data:ingestMutations(projectId: $projectId) {
data:cancel(input: $input)
}
}
}
""";
GraphQLRequest request = new() { Query = QUERY, Variables = new { input, input.projectId } };
var res = await _client
.ExecuteGraphQLRequest<RequiredResponse<RequiredResponse<RequiredResponse<bool>>>>(request, cancellationToken)
.ConfigureAwait(false);
return res.data.data.data;
}
}
+65 -22
View File
@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
#if NET6_0_OR_GREATER
@@ -8,47 +9,58 @@ using System.Runtime.InteropServices;
namespace Speckle.Sdk.Common;
/// <summary>
/// Helpers for hashing data to a hex string
/// </summary>
public static class Sha256
{
public const string DEFAULT_FORMAT = "x2";
public const int HASH_SIZE_CHARS = 64; // SHA256.HashSizeInBytes * sizeof(char)
#if NET6_0_OR_GREATER
/// <param name="input">the value to hash</param>
/// <param name="format"><c>"x2"</c> for lower case, <c>"X2"</c> for uppercase.</param>
/// <param name="length">Desired length of the returned string. Must be 2 &#x2264; Length &#x2264; 64, and must be a multiple of 2</param>
/// <returns><inheritdoc cref="GetString(string, string?, int)"/></returns>
[Pure]
public static string GetString(
ReadOnlySpan<char> input,
[StringSyntax(StringSyntaxAttribute.NumericFormat)] string? format = "x2",
int length = SHA256.HashSizeInBytes * sizeof(char)
)
/// <param name="destination">Output hash; it must have <c>2 &#x2264; Length &#x2264; 64</c>, and must be a multiple of 2</param>
/// <param name="formatUpperCase"><see langword="true"/> for upper case, false otherwise</param>
public static void Hash(ReadOnlySpan<char> input, bool formatUpperCase, Span<char> destination)
{
ReadOnlySpan<byte> inputBytes = MemoryMarshal.AsBytes(input);
Hash(inputBytes, formatUpperCase, destination);
}
public static void Hash(ReadOnlySpan<byte> input, bool formatUpperCase, Span<char> destination)
{
Span<byte> hash = stackalloc byte[SHA256.HashSizeInBytes];
SHA256.HashData(inputBytes, hash);
SHA256.HashData(input, hash);
Span<char> output = stackalloc char[length];
FormatHash(hash, formatUpperCase, destination);
}
for (int i = 0, j = 0; j < length; i += sizeof(byte), j += sizeof(char))
public static void Hash(Stream source, bool formatUpperCase, Span<char> destination)
{
Span<byte> hash = stackalloc byte[SHA256.HashSizeInBytes];
SHA256.HashData(source, hash);
FormatHash(hash, formatUpperCase, destination);
}
private static void FormatHash(ReadOnlySpan<byte> input, bool formatUpperCase, Span<char> output)
{
for (int i = 0, j = 0; j < output.Length; i += sizeof(byte), j += sizeof(char))
{
hash[i].TryFormat(output[j..], out _, format);
input[i].TryFormat(output[j..], out _, formatUpperCase ? "X2" : "x2");
}
return new string(output);
}
#endif
/// <param name="input">the value to hash</param>
/// <param name="format"><c>"x2"</c> for lower case, <c>"X2"</c> for uppercase.</param>
/// <param name="length">Desired length of the returned string</param>
/// <param name="outputLengthChars">Desired length of the returned string</param>
/// <returns>the hash string</returns>
/// <exception cref="FormatException"><paramref name="format"/> is not a recognised numeric format</exception>
/// <exception cref="ArgumentOutOfRangeException"><inheritdoc cref="StringBuilder.ToString(int, int)"/></exception>
[Pure]
public static string GetString(
public static string Hash(
string input,
[StringSyntax(StringSyntaxAttribute.NumericFormat)] string? format = "x2",
int length = 64
[StringSyntax(StringSyntaxAttribute.NumericFormat)] string? format = DEFAULT_FORMAT,
int outputLengthChars = HASH_SIZE_CHARS
)
{
var inputBytes = Encoding.Unicode.GetBytes(input);
@@ -59,12 +71,43 @@ public static class Sha256
byte[] hash = sha256.ComputeHash(inputBytes);
#endif
StringBuilder sb = new(64);
StringBuilder sb = new(HASH_SIZE_CHARS);
foreach (byte b in hash)
{
sb.Append(b.ToString(format));
}
return sb.ToString(0, length);
return sb.ToString(0, outputLengthChars);
}
/// <inheritdoc cref="Hash(string, string?, int)"/>
[Pure]
public static string Hash(
Stream input,
[StringSyntax(StringSyntaxAttribute.NumericFormat)] string? format = DEFAULT_FORMAT,
int outputLengthChars = HASH_SIZE_CHARS
)
{
#if NET6_0_OR_GREATER
byte[] hash = SHA256.HashData(input);
#else
using var sha256 = SHA256.Create();
byte[] hash = sha256.ComputeHash(input);
#endif
return FormatHash(hash, format, outputLengthChars);
}
[Pure]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string FormatHash(byte[] hash, string? format, int outputLengthChars)
{
StringBuilder sb = new(HASH_SIZE_CHARS);
foreach (byte b in hash)
{
sb.Append(b.ToString(format));
}
return sb.ToString(0, outputLengthChars);
}
}
+12 -11
View File
@@ -1,38 +1,39 @@
using System.Runtime.Serialization;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.Serialization;
using Speckle.Newtonsoft.Json;
namespace Speckle.Sdk.Models;
[SpeckleType("Speckle.Core.Models.Blob")]
public class Blob : Base
public sealed class Blob : Base
{
[JsonIgnore]
public static int LocalHashPrefixLength => 20;
private string _filePath;
private string _hash;
private string? _hash;
private bool _isHashExpired = true;
public Blob() { }
[SetsRequiredMembers]
public Blob(string filePath)
{
this.filePath = filePath;
this.originalPath = filePath;
}
public string filePath
public required string filePath
{
get => _filePath;
set
{
originalPath ??= value;
_filePath = value;
_isHashExpired = true;
}
}
public required string originalPath { get; set; }
public string originalPath { get; set; }
[JsonIgnore]
public FileInfo FileInfo => new(filePath);
/// <summary>
/// For blobs, the id is the same as the file hash. Please note, when deserialising, the id will be set from the original hash generated on sending.
@@ -45,9 +46,9 @@ public class Blob : Base
public string? GetFileHash()
{
if ((_isHashExpired || _hash == null) && filePath != null)
if ((_isHashExpired || _hash == null))
{
_hash = HashUtility.HashFile(filePath);
_hash = HashUtility.CalculateBlobHash(filePath);
}
return _hash;
+27 -14
View File
@@ -1,26 +1,39 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using System.Diagnostics.Contracts;
using Speckle.Sdk.Common;
using Speckle.Sdk.Serialisation;
namespace Speckle.Sdk.Models;
/// <summary>
/// Helper functions for calculating hash based Ids for Speckle core concepts
/// </summary>
public static class HashUtility
{
public enum HashingFunctions
public const int HASH_LENGTH_CHARS = 32;
[Pure]
public static Id ComputeObjectId(Json serialized)
{
SHA256,
MD5,
#if NET6_0_OR_GREATER
Span<char> hash = stackalloc char[HASH_LENGTH_CHARS];
Sha256.Hash(serialized.Value.AsSpan(), false, hash);
return new Id(new string(hash));
#else
string hash = Sha256.Hash(serialized.Value, outputLengthChars: HashUtility.HASH_LENGTH_CHARS);
return new Id(hash);
#endif
}
public const int HASH_LENGTH = 32;
[SuppressMessage("Security", "CA5351:Do Not Use Broken Cryptographic Algorithms")]
public static string HashFile(string filePath, HashingFunctions func = HashingFunctions.SHA256)
[Pure]
public static string CalculateBlobHash(string filePath)
{
using HashAlgorithm hashAlgorithm = func == HashingFunctions.MD5 ? MD5.Create() : SHA256.Create();
using var stream = File.OpenRead(filePath);
var hash = hashAlgorithm.ComputeHash(stream);
return BitConverter.ToString(hash, 0, HASH_LENGTH).Replace("-", "").ToLowerInvariant();
#if NET6_0_OR_GREATER
Span<char> hash = stackalloc char[HASH_LENGTH_CHARS];
Sha256.Hash(stream, false, hash);
return new(hash);
#else
return Sha256.Hash(stream, "x2", HASH_LENGTH_CHARS);
#endif
}
}
@@ -1,19 +0,0 @@
using System.Diagnostics.Contracts;
using Speckle.Sdk.Common;
using Speckle.Sdk.Models;
namespace Speckle.Sdk.Serialisation;
public static class IdGenerator
{
[Pure]
public static Id ComputeId(Json serialized)
{
#if NET6_0_OR_GREATER
string hash = Sha256.GetString(serialized.Value.AsSpan(), length: HashUtility.HASH_LENGTH);
#else
string hash = Sha256.GetString(serialized.Value, length: HashUtility.HASH_LENGTH);
#endif
return new Id(hash);
}
}
@@ -358,7 +358,7 @@ public class SpeckleObjectSerializer
if (writer is SerializerIdWriter serializerIdWriter)
{
(var json, writer) = serializerIdWriter.FinishIdWriter();
id = IdGenerator.ComputeId(json);
id = HashUtility.ComputeObjectId(json);
}
else
{
@@ -1,12 +1,13 @@
using System.Text;
using Speckle.Sdk.Models;
namespace Speckle.Sdk.Serialisation.V2.Send;
public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures) : IHasByteSize
public record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures) : IHasByteSize
{
public int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public virtual int ByteSize { get; } = Encoding.UTF8.GetByteCount(Json.Value);
public bool Equals(BaseItem? other)
public virtual bool Equals(BaseItem? other)
{
if (other is null)
{
@@ -17,3 +18,10 @@ public sealed record BaseItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id
public override int GetHashCode() => Id.GetHashCode();
}
public sealed record BlobItem(Id Id, Json Json, bool NeedsStorage, Dictionary<Id, int>? Closures, Blob Blob)
: BaseItem(Id, Json, NeedsStorage, Closures)
{
public Blob Blob { get; } = Blob;
public override int ByteSize { get; } = (int)Blob.FileInfo.Length;
}
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.SQLite;
@@ -9,7 +10,13 @@ namespace Speckle.Sdk.Serialisation.V2.Send;
public interface IObjectSaver : IDisposable
{
Exception? Exception { get; set; }
Task Start(int? maxParallelism, int? httpBatchSize, int? cacheBatchSize, CancellationToken cancellationToken);
Task Start(
int? maxParallelism,
int? httpBatchSize,
int? blobBatchSize,
int? cacheBatchSize,
CancellationToken cancellationToken
);
void DoneTraversing();
Task DoneSaving();
Task SaveAsync(BaseItem item);
@@ -19,14 +26,11 @@ public sealed class ObjectSaver(
IProgress<ProgressArgs>? progress,
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IServerBlobManager? serverBlobManager,
ILogger<ObjectSaver> logger,
SerializeProcessOptions options,
CancellationToken cancellationToken
#pragma warning disable CS9107
#pragma warning disable CA2254
) : ChannelSaver<BaseItem>, IObjectSaver
#pragma warning restore CA2254
#pragma warning restore CS9107
) : ChannelSaver<BaseItem, BlobItem>, IObjectSaver
{
private readonly CancellationTokenSource _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken
@@ -40,6 +44,24 @@ public sealed class ObjectSaver(
private long _objectsSerialized;
private bool _disposed;
protected override async Task SendBlobToServerInternal(Batch<BlobItem> batch)
{
// Callers should either setup a blob manager, or not try and send blobs
serverBlobManager.NotNull("No blob manager was setup to handle sending blobs");
var objectBatch = batch.Items.Distinct().Select(x => (x.Blob.id.NotNull(), x.Blob.filePath)).ToList();
// var hasObjects = await serverBlobManager
// .HasObjects(objectBatch.Select(x => x.Id.Value).Freeze(), _cancellationTokenSource.Token)
// .ConfigureAwait(false);
// objectBatch = batch.Items.Where(x => !hasObjects[x.Id.Value]).ToList();
if (objectBatch.Count != 0)
{
// Interlocked.Add(ref _uploading, batch.Items.Count);
// progress?.Report(new(ProgressEvent.UploadingObjects, _uploading, null));
await serverBlobManager.UploadBlobs(objectBatch, progress, _cancellationTokenSource.Token).ConfigureAwait(false);
}
}
protected override async Task SendToServerInternal(Batch<BaseItem> batch)
{
if (IsCancelled())
@@ -343,7 +343,7 @@ public sealed class ObjectSerializer : IObjectSerializer
if (writer is SerializerIdWriter serializerIdWriter)
{
(var json, writer) = serializerIdWriter.FinishIdWriter();
id = IdGenerator.ComputeId(json);
id = HashUtility.ComputeObjectId(json);
}
else
{
@@ -17,6 +17,7 @@ public record SerializeProcessOptions(
{
public int? MaxHttpSendBatchSize { get; set; }
public int? MaxCacheBatchSize { get; set; }
public int? MaxBlobBatchSize { get; set; }
public int? MaxParallelism { get; set; }
}
@@ -109,6 +110,7 @@ public sealed class SerializeProcess(
var channelTask = objectSaver.Start(
options.MaxParallelism,
options.MaxHttpSendBatchSize,
options.MaxBlobBatchSize,
options.MaxCacheBatchSize,
_processSource.Token
);
@@ -13,26 +13,36 @@ public class SerializeProcessFactory(
IObjectSerializerFactory objectSerializerFactory,
ISqLiteJsonCacheManagerFactory sqLiteJsonCacheManagerFactory,
IServerObjectManagerFactory serverObjectManagerFactory,
IServerBlobManagerFactory serverBlobManagerFactory,
ILoggerFactory loggerFactory
) : ISerializeProcessFactory
{
public ISerializeProcess CreateSerializeProcess(
Uri url,
string streamId,
string projectId,
string? authorizationToken,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
)
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);
return CreateSerializeProcess(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken, options);
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(projectId);
var serverObjectManager = serverObjectManagerFactory.Create(url, projectId, authorizationToken);
var serverBlobManager = serverBlobManagerFactory.Create(url, projectId, authorizationToken);
return CreateSerializeProcess(
sqLiteJsonCacheManager,
serverObjectManager,
serverBlobManager,
progress,
cancellationToken,
options
);
}
public ISerializeProcess CreateSerializeProcess(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IServerBlobManager? serverBlobManager,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken,
SerializeProcessOptions? options = null
@@ -43,6 +53,7 @@ public class SerializeProcessFactory(
progress,
sqLiteJsonCacheManager,
serverObjectManager,
serverBlobManager,
loggerFactory.CreateLogger<ObjectSaver>(),
options ?? new SerializeProcessOptions(),
cancellationToken
@@ -68,6 +79,7 @@ public class SerializeProcessFactory(
return CreateSerializeProcess(
memoryJsonCacheManager,
new MemoryServerObjectManager(objects),
null!, //this would need a better solution
progress,
cancellationToken,
options
@@ -0,0 +1,20 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
namespace Speckle.Sdk.Serialisation.V2;
[GenerateAutoInterface]
public sealed class ServerBlobManagerFactory(ISpeckleHttp speckleHttp) : IServerBlobManagerFactory
{
public IServerBlobManager Create(
Uri serverUrl,
string projectId,
string? authorizationToken,
TimeSpan? timeout = null
)
{
var client = speckleHttp.CreateHttpClient(authorizationToken: authorizationToken);
client.BaseAddress = serverUrl;
return new ServerBlobManager(client, projectId);
}
}
@@ -0,0 +1,41 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Transports;
using Speckle.Sdk.Transports.ServerUtils;
namespace Speckle.Sdk.Serialisation.V2;
[GenerateAutoInterface(VisibilityModifier = "public")]
internal sealed class ServerBlobManager(HttpClient authorizedClient, string projectId) : IServerBlobManager
{
public async Task UploadBlobs(
IReadOnlyCollection<(string blobId, string filePath)> objects,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
)
{
if (objects.Count == 0)
{
return;
}
var multipartFormDataContent = new MultipartFormDataContent();
foreach (var (id, filePath) in objects)
{
var fileName = Path.GetFileName(filePath);
var stream = File.OpenRead(filePath);
StreamContent fsc = new(stream);
multipartFormDataContent.Add(fsc, $"hash:{id}", fileName);
cancellationToken.ThrowIfCancellationRequested();
}
using var message = new HttpRequestMessage();
message.RequestUri = new Uri($"/api/stream/{projectId}/blob", UriKind.Relative);
message.Method = HttpMethod.Post;
message.Content = new ProgressContent(multipartFormDataContent, progress);
using var response = await authorizedClient.SendAsync(message, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
}
}
@@ -59,6 +59,7 @@ public class CancellationTests
new DummySqLiteSendManager(),
new CancellationServerObjectManager(cancellationSource),
null,
null,
cancellationSource.Token,
new SerializeProcessOptions(true, true, false, true)
);
@@ -79,6 +80,7 @@ public class CancellationTests
new DummySqLiteSendManager(),
new CancellationServerObjectManager(cancellationSource),
null,
null,
cancellationSource.Token,
new SerializeProcessOptions(true, true, false, true)
);
@@ -40,6 +40,7 @@ public class DataObjectTests
new MemoryJsonCacheManager(json),
new DummyServerObjectManager(),
null,
null,
default,
new SerializeProcessOptions(false, false, true, true)
);
@@ -37,6 +37,7 @@ public class ExceptionTests
new MemoryJsonCacheManager(objects),
new ExceptionServerObjectManager(),
null,
null,
default,
new SerializeProcessOptions(false, false, false, true)
);
@@ -55,6 +56,7 @@ public class ExceptionTests
new ExceptionSendCacheManager(),
new MemoryServerObjectManager(new()),
null,
null,
default,
new SerializeProcessOptions(false, false, false, true)
);
@@ -92,6 +94,7 @@ public class ExceptionTests
new ExceptionSendCacheManager(exceptionsAfter: 10),
new MemoryServerObjectManager(new()),
null,
null,
default,
new SerializeProcessOptions(false, false, false, true)
{
@@ -146,7 +146,7 @@ public class SerializationTests
jObject.Remove("id");
jObject.Remove("__closure");
var jsonWithoutId = jObject.ToString(Formatting.None);
var newId = IdGenerator.ComputeId(new Json(jsonWithoutId));
var newId = HashUtility.ComputeObjectId(new Json(jsonWithoutId));
id.Should().Be(newId.Value);
}
@@ -227,6 +227,7 @@ public class SerializationTests
SqLiteJsonCacheManager.FromMemory(1),
new MemoryServerObjectManager(newIdToJson),
null,
null,
default,
new SerializeProcessOptions(false, false, false, true) { MaxCacheBatchSize = 1, MaxParallelism = concurrency }
)
@@ -60,7 +60,7 @@ public class BlobApiExceptionalTests : IAsyncLifetime
{
await writer.WriteLineAsync(PAYLOAD);
}
string id = HashUtility.HashFile(filePath);
string id = HashUtility.CalculateBlobHash(filePath);
var ex = await Assert.ThrowsAsync<HttpRequestException>(async () =>
await _sut.UploadBlobs("non-existent-project", [(id, filePath)], null, CancellationToken.None)
);
@@ -34,7 +34,7 @@ public class BlobApiTests : IAsyncLifetime
{
await writer.WriteLineAsync(PAYLOAD);
}
string id = HashUtility.HashFile(filePath);
string id = HashUtility.CalculateBlobHash(filePath);
//act
var preDiff = await _blobApi.HasBlobs(_project.id, [id], CancellationToken.None);
@@ -1,122 +0,0 @@
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using Speckle.Sdk.Api;
using Speckle.Sdk.Api.GraphQL.Inputs;
using Speckle.Sdk.Api.GraphQL.Models;
using Speckle.Sdk.Api.GraphQL.Resources;
using Speckle.Sdk.Host;
using Speckle.Sdk.Models;
using Speckle.Sdk.Transports;
namespace Speckle.Sdk.Tests.Integration.API.GraphQL.Resources;
public class IngestResourceTests : IAsyncLifetime
{
private IClient _testUser;
private IngestResource Sut => _testUser.Ingest;
private Project _project;
private Model _model;
private IOperations _operations;
public Task DisposeAsync() => Task.CompletedTask;
public async Task InitializeAsync()
{
TypeLoader.Reset();
TypeLoader.Initialize(typeof(Base).Assembly, Assembly.GetExecutingAssembly());
var serviceProvider = TestServiceSetup.GetServiceProvider();
_operations = serviceProvider.GetRequiredService<IOperations>();
_testUser = await Fixtures.SeedUserWithClient();
_project = await _testUser.Project.Create(new("Test project", "", null));
_model = await _testUser.Model.Create(new("Test Model 1", "", _project.id));
}
[Fact]
public async Task CreateAndError()
{
var input = new IngestCreateInput(
"myTestFile",
1,
_model.id,
_project.id,
".NET",
"0.0.0",
new Dictionary<string, object?>()
);
Ingest ingest = await Sut.Create(input);
var errorInput = new IngestErrorInput("A bad thing happened", "Over hear!", ingest.id, _project.id);
var res = await Sut.Error(errorInput);
Assert.True(res);
var result = await Sut.GetIngests(_model.id, _project.id);
await Verify(result);
}
[Fact]
public async Task CreateAndCancel()
{
var input = new IngestCreateInput(
"myTestFile",
1,
_model.id,
_project.id,
".NET",
"0.0.0",
new Dictionary<string, object?>()
);
Ingest ingest = await Sut.Create(input);
var errorInput = new CancelRequestInput(ingest.id, _project.id);
var res = await Sut.Cancel(errorInput);
Assert.True(res);
var result = await Sut.GetIngests(_model.id, _project.id);
await Verify(result);
}
[Fact]
public async Task CreateAndEnd()
{
var create = new IngestCreateInput(
"myTestFile",
1,
_model.id,
_project.id,
".NET",
"0.0.0",
new Dictionary<string, object?>()
);
Ingest ingest = await Sut.Create(create);
var myObject = Fixtures.GenerateNestedObject();
var sendResult = await _operations.Send2(
_testUser.ServerUrl,
_project.id,
_testUser.Account.token,
myObject,
new Progress<ProgressArgs>(x =>
{
var updateInput = new IngestUpdateInput(
ingest.id,
x.Total == null ? null : x.Count / x.Total,
$"{x.Count} / {x.Total}",
_project.id
);
_ = Sut.Update(updateInput).Result;
}),
CancellationToken.None,
new(true, true)
);
var finish = new IngestFinishInput(ingest.id, "Yay! we completed", sendResult.RootId, _project.id);
var res = await Sut.End(finish);
Assert.NotNull(res);
var result = await Sut.GetIngests(_model.id, _project.id);
await Verify(result);
}
}
@@ -19,12 +19,14 @@ public class CryptSha256Hash
[Benchmark]
public string Sha256()
{
return Speckle.Sdk.Common.Sha256.GetString(testData);
return Speckle.Sdk.Common.Sha256.Hash(testData);
}
[Benchmark]
public string Sha256_Span()
{
return Speckle.Sdk.Common.Sha256.GetString(testData.AsSpan());
Span<char> resultLowerSpan = stackalloc char[Speckle.Sdk.Common.Sha256.HASH_SIZE_CHARS];
Speckle.Sdk.Common.Sha256.Hash(testData.AsSpan(), false, resultLowerSpan);
return new string(resultLowerSpan);
}
}
@@ -69,8 +69,8 @@ public sealed class HashUtilityTests
[MemberData(nameof(SmallTestCasesSha256))]
public void Sha256(string input, string expected, string _, int length)
{
var resultLower = Speckle.Sdk.Common.Sha256.GetString(input, "x2", length);
var resultUpper = Speckle.Sdk.Common.Sha256.GetString(input, "X2", length);
var resultLower = Speckle.Sdk.Common.Sha256.Hash(input, "x2", length);
var resultUpper = Speckle.Sdk.Common.Sha256.Hash(input, "X2", length);
resultLower.Should().Be(new string(expected.ToLower()[..length]));
@@ -86,19 +86,22 @@ public sealed class HashUtilityTests
int length //Span version of the function must have multiple of 2
)
{
var resultLowerSpan = Speckle.Sdk.Common.Sha256.GetString(input.AsSpan(), "x2", length);
var resultUpperSpan = Speckle.Sdk.Common.Sha256.GetString(input.AsSpan(), "X2", length);
Span<char> resultLowerSpan = stackalloc char[length];
Speckle.Sdk.Common.Sha256.Hash(input.AsSpan(), false, resultLowerSpan);
Span<char> resultUpperSpan = stackalloc char[length];
Speckle.Sdk.Common.Sha256.Hash(input.AsSpan(), true, resultUpperSpan);
resultLowerSpan.Should().Be(new string(expected.ToLower()[..length]));
new string(resultLowerSpan).Should().Be(new string(expected.ToLower()[..length]));
resultUpperSpan.Should().Be(new string(expected.ToUpper()[..length]));
new string(resultUpperSpan).Should().Be(new string(expected.ToUpper()[..length]));
}
[Theory]
[MemberData(nameof(LargeTestCases))]
public void Sha256_LargeDataTests(string input, string expected)
public void Sha256_Span_LargeDataTests(string input, string expected)
{
var computedHash = Speckle.Sdk.Common.Sha256.GetString(input.AsSpan());
computedHash.Should().Be(expected);
Span<char> output = stackalloc char[Speckle.Sdk.Common.Sha256.HASH_SIZE_CHARS];
Speckle.Sdk.Common.Sha256.Hash(input.AsSpan(), false, output);
new string(output).Should().Be(expected);
}
}