using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Threading; using System.Threading.Tasks; using System.Windows.Threading; using Xunit.Runner.Data; namespace Xunit.Runner.Wpf.Impl { internal partial class RemoteTestUtil { private sealed class BackgroundWriter { private readonly ClientWriter _writer; private readonly ImmutableArray _data; private readonly Action _writeValue; private readonly CancellationToken _cancellationToken; internal BackgroundWriter(ClientWriter writer, ImmutableArray data, Action writeValue, CancellationToken cancellationToken) { _writer = writer; _writeValue = writeValue; _data = data; _cancellationToken = cancellationToken; } internal Task WriteAsync() { return Task.Run(() => GoOnBackground(), _cancellationToken); } private void GoOnBackground() { foreach (var item in _data) { if (_cancellationToken.IsCancellationRequested) { break; } _writer.Write(TestDataKind.Value); _writeValue(_writer, item); } _writer.Write(TestDataKind.EndOfData); } } /// /// Utility for reading a collection of values from the given /// value. /// /// private sealed class BackgroundReader where T : class { private readonly ConcurrentQueue _queue; private readonly ClientReader _reader; private readonly Func _readValue; internal ClientReader Reader => _reader; internal BackgroundReader(ConcurrentQueue queue, ClientReader reader, Func readValue) { _queue = queue; _reader = reader; _readValue = readValue; } internal Task ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { return Task.Run(() => GoOnBackground(cancellationToken), cancellationToken); } /// /// This will be called on a background thread to read the results of the test from the /// named pipe client stream. /// /// private void GoOnBackground(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { var kind = _reader.ReadKind(); if (kind != TestDataKind.Value) { break; } var value = _readValue(_reader); _queue.Enqueue(value); } catch { // TODO: Happens when the connection unexpectedly closes on us. Need to surface this // to the user. break; } } // Signal we are done _queue.Enqueue(null); } } private sealed class BackgroundProducer where T : class { private const int MaxResultPerTick = 1000; private readonly Connection _connection; private readonly ConcurrentQueue _queue; private readonly DispatcherTimer _timer; private readonly Action> _callback; private readonly int _maxPerTick; private readonly TaskCompletionSource _taskCompletionSource; internal Task Task => _taskCompletionSource.Task; internal BackgroundProducer( Connection connection, Dispatcher dispatcher, ConcurrentQueue queue, Action> callback, int maxResultPerTick = MaxResultPerTick, TimeSpan? interval = null) { _connection = connection; _queue = queue; _maxPerTick = maxResultPerTick; _callback = callback; _timer = new DispatcherTimer( interval ?? TimeSpan.FromMilliseconds(500), DispatcherPriority.Normal, OnTimerTick, dispatcher); _taskCompletionSource = new TaskCompletionSource(); } private void OnTimerTick(object sender, EventArgs e) { var i = 0; var list = new List(); var isDone = false; T value; while (i < _maxPerTick && _queue.TryDequeue(out value)) { if (value == null) { isDone = true; break; } list.Add(value); } _callback(list); if (isDone) { try { _timer.Stop(); _connection.Dispose(); } finally { _taskCompletionSource.SetResult(true); } } } } } }