diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 85a98f0e..b2e8360e 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -62,6 +62,9 @@ + + + diff --git a/src/Directory.build.targets b/src/Directory.build.targets index f2fe77ec..633bd8af 100644 --- a/src/Directory.build.targets +++ b/src/Directory.build.targets @@ -15,26 +15,35 @@ $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + + $(DefineConstants);SUPPORTS_ASYNC_DISPOSABLE + + - $(DefineConstants);NETCOREAPP;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETCOREAPP;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE + + + + $(DefineConstants);SUPPORTS_ASYNC_DISPOSABLE - $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE - $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE - $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE - $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE + - $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST + $(DefineConstants);NETSTANDARD;PORTABLE;P_LINQ;SUPPORTS_BINDINGLIST;SUPPORTS_ASYNC_DISPOSABLE diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet9_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet9_0.verified.txt index e594d8a7..fba40fd2 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet9_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet9_0.verified.txt @@ -1137,6 +1137,9 @@ namespace DynamicData public static DynamicData.IObservableCache AsObservableCache(this System.IObservable> source, bool applyLocking = true) where TObject : notnull where TKey : notnull { } + public static System.IObservable> AsyncDisposeMany(this System.IObservable> source, System.Action> disposalsCompletedAccessor) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> AutoRefresh(this System.IObservable> source, System.TimeSpan? changeSetBuffer = default, System.TimeSpan? propertyChangeThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null) where TObject : System.ComponentModel.INotifyPropertyChanged where TKey : notnull { } diff --git a/src/DynamicData.Tests/Cache/AsyncDisposeManyFixture.cs b/src/DynamicData.Tests/Cache/AsyncDisposeManyFixture.cs new file mode 100644 index 00000000..b87fbb70 --- /dev/null +++ b/src/DynamicData.Tests/Cache/AsyncDisposeManyFixture.cs @@ -0,0 +1,723 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Cache.Internal; +using DynamicData.Kernel; +using DynamicData.Tests.Utilities; + +using FluentAssertions; +using FluentAssertions.Equivalency.Steps; + +using Xunit; +using Xunit.Abstractions; + +namespace DynamicData.Tests.Cache; + +public class AsyncDisposeManyFixture +{ + public enum SourceType + { + Subject, + Immediate + } + + public enum ItemType + { + Plain, + Disposable, + AsyncDisposable, + ImmediateAsyncDisposable + } + + [Theory] + [InlineData(ItemType.Disposable)] + [InlineData(ItemType.AsyncDisposable)] + [InlineData(ItemType.ImmediateAsyncDisposable)] + public async Task ItemDisposalErrors_ErrorPropagatesToDisposalsCompleted(ItemType itemType) + { + using var source = new SourceCache(static item => item.Id); + using var sourceCompletionSource = new Subject(); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .Connect() + .TakeUntil(sourceCompletionSource) + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + source.AddOrUpdate(new[] + { + ItemBase.Create(type: itemType, id: 1, version: 1), + ItemBase.Create(type: itemType, id: 2, version: 1), + ItemBase.Create(type: itemType, id: 3, version: 1) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("no disposals should have occurred"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("no disposals should have occurred"); + + + var error = new Exception("Test"); + source.Items.ElementAt(1).FailDisposal(error); + + sourceCompletionSource.OnNext(Unit.Default); + + // RX and TPL don't guarantee Task continuations run synchronously with antecedent completion + await disposalsCompletedResults.WaitForFinalizationAsync(TimeSpan.FromSeconds(5)); + + results.Error.Should().BeNull("disposal errors should be propagated on disposalsCompleted"); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed"); + results.HasCompleted.Should().BeTrue(); + + disposalsCompletedResults.Error.Should().Be(error, "disposal errors should be caught and propagated on disposalsCompleted"); + } + + [Theory] + [InlineData(ItemType.Plain)] + [InlineData(ItemType.Disposable)] + [InlineData(ItemType.AsyncDisposable)] + [InlineData(ItemType.ImmediateAsyncDisposable)] + public async Task ItemDisposalsComplete_DisposalsCompletedOccursAndCompletes(ItemType itemType) + { + using var source = new SourceCache(static item => item.Id); + using var sourceCompletionSource = new Subject(); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .Connect() + .TakeUntil(sourceCompletionSource) + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + source.AddOrUpdate(new[] + { + ItemBase.Create(type: itemType, id: 1, version: 1), + ItemBase.Create(type: itemType, id: 2, version: 1), + ItemBase.Create(type: itemType, id: 3, version: 1) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + sourceCompletionSource.OnNext(Unit.Default); + foreach (var item in source.Items) + item.CompleteDisposal(); + + // RX and TPL don't guarantee Task continuations run synchronously with antecedent completion + await disposalsCompletedResults.WaitForFinalizationAsync(TimeSpan.FromSeconds(5)); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed"); + results.HasCompleted.Should().BeTrue(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "the source and all disposals have completed"); + disposalsCompletedResults.HasCompleted.Should().BeTrue("the source and all disposals have completed"); + } + + [Fact] + public async Task ItemDisposalsOccurOnMultipleThreads_DisposalIsThreadSafe() + { + using var source = new SourceCache(static item => item.Id); + using var sourceCompletionSource = new Subject(); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .Connect() + .TakeUntil(sourceCompletionSource) + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + var items = Enumerable.Range(1, 100_000) + .Select(id => new AsyncDisposableItem() + { + Id = id, + Version = 1 + }) + .ToArray(); + + source.AddOrUpdate(items); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "items were added"); + results.HasCompleted.Should().BeFalse(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + sourceCompletionSource.OnNext(); + await Task.WhenAll(items + .GroupBy(item => item.Id % 4) + .Select(group => Task.Run(() => + { + foreach (var item in group) + item.CompleteDisposal(); + }))); + + // RX and TPL don't guarantee Task continuations run synchronously with antecedent completion + await disposalsCompletedResults.WaitForFinalizationAsync(TimeSpan.FromSeconds(30)); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "no items were removed"); + results.HasCompleted.Should().BeTrue(); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed upon source completion"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "the source and all disposals have completed"); + disposalsCompletedResults.HasCompleted.Should().BeTrue("the source and all disposals have completed"); + } + + [Theory] + [InlineData(ItemType.Plain)] + [InlineData(ItemType.Disposable)] + [InlineData(ItemType.AsyncDisposable)] + [InlineData(ItemType.ImmediateAsyncDisposable)] + public void ItemsAreAddedMovedOrRefreshed_ItemsAreNotDisposed(ItemType itemType) + { + using var source = new Subject>(); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + // Addition + var items = new List() + { + ItemBase.Create(type: itemType, id: 1, version: 1), + ItemBase.Create(type: itemType, id: 2, version: 1), + ItemBase.Create(type: itemType, id: 3, version: 1) + }; + + source.OnNext(new ChangeSet(items + .Select((item, index) => new Change( + reason: ChangeReason.Add, + key: item.Id, + current: item, + index: index)))); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + items, + options => options.WithStrictOrdering(), + "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeFalse(), "items should not be disposed upon add"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Movement + items.Move(2, 0, items[2]); + items.Move(2, 1, items[2]); + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Moved, key: items[0].Id, current: items[0], previous: Optional.None(), currentIndex: 0, previousIndex: 2), + new(reason: ChangeReason.Moved, key: items[1].Id, current: items[1], previous: Optional.None(), currentIndex: 1, previousIndex: 2) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + items, + options => options.WithStrictOrdering(), + "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeFalse(), "items should not be disposed upon movement"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + // Refreshing + source.OnNext(new ChangeSet(items + .Select((item, index) => new Change( + reason: ChangeReason.Refresh, + key: item.Id, + current: item, + index: index)))); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsSorted.Should().BeEquivalentTo( + items, + options => options.WithStrictOrdering(), + "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeFalse(), "items should not be disposed upon refresh"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(ItemType.Plain)] + [InlineData(ItemType.Disposable)] + [InlineData(ItemType.AsyncDisposable)] + [InlineData(ItemType.ImmediateAsyncDisposable)] + public void ItemsAreRemoved_ItemsAreDisposedAfterDownstreamProcessing(ItemType itemType) + { + using var source = new SourceCache(static item => item.Id); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .Connect() + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .Do(changes => + { + foreach (var change in changes) + if (change.Reason is ChangeReason.Remove) + change.Current.HasBeenDisposed.Should().BeFalse("disposal should only occur after downstream processing has completed"); + }) + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + source.AddOrUpdate(new[] + { + ItemBase.Create(type: itemType, id: 1, version: 1), + ItemBase.Create(type: itemType, id: 2, version: 1), + ItemBase.Create(type: itemType, id: 3, version: 1) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + var items = source.Items.ToArray(); + source.Clear(); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEmpty("all items were removed"); + results.HasCompleted.Should().BeFalse(); + + items.Where(static item => item.CanBeDisposed).Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed after removal"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Theory] + [InlineData(ItemType.Plain)] + [InlineData(ItemType.Disposable)] + [InlineData(ItemType.AsyncDisposable)] + [InlineData(ItemType.ImmediateAsyncDisposable)] + public void ItemsAreUpdated_PreviousItemsAreDisposedAfterDownstreamProcessing(ItemType itemType) + { + using var source = new SourceCache(static item => item.Id); + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .Connect() + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .Do(changes => + { + foreach (var change in changes) + if (change.Reason is ChangeReason.Remove) + change.Current.HasBeenDisposed.Should().BeFalse("disposal should only occur after downstream processing has completed"); + }) + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + source.AddOrUpdate(new[] + { + ItemBase.Create(type: itemType, id: 1, version: 1), + ItemBase.Create(type: itemType, id: 2, version: 1), + ItemBase.Create(type: itemType, id: 3, version: 1) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added"); + results.HasCompleted.Should().BeFalse(); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + + + var previousItems = source.Items.ToArray(); + source.AddOrUpdate(new[] + { + ItemBase.Create(type: itemType, id: 1, version: 2), + ItemBase.Create(type: itemType, id: 2, version: 2), + ItemBase.Create(type: itemType, id: 3, version: 2) + }); + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "all items were replaced"); + results.HasCompleted.Should().BeFalse(); + + previousItems.Where(static item => item.CanBeDisposed).Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed after replacement"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed"); + disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed"); + } + + [Fact] + public void OnDisposalsCompletedIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.AsyncDisposeMany( + source: Observable.Empty>(), + disposalsCompletedAccessor: null!)) + .Should() + .Throw(); + + [Theory] + [InlineData(SourceType.Subject)] + [InlineData(SourceType.Immediate)] + public void SourceCompletes_ItemsAreDisposedAndCompletionPropagates(SourceType sourceType) + { + var items = new[] + { + new ImmediateAsyncDisposableItem() { Id = 1, Version = 1}, + new ImmediateAsyncDisposableItem() { Id = 2, Version = 1}, + new ImmediateAsyncDisposableItem() { Id = 3, Version = 1} + }; + + var changeSet = new ChangeSet(items + .Select(item => new Change(reason: ChangeReason.Add, key: item.Id, current: item))); + + IObservable> source = (sourceType is SourceType.Immediate) + ? Observable.Return(changeSet) + : new Subject>(); + + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + if (source is Subject> subject) + { + subject.OnNext(changeSet); + subject.OnCompleted(); + } + + results.Error.Should().BeNull(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "3 items were added"); + results.HasCompleted.Should().BeTrue(); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed upon source completion"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "all items have completed disposal"); + disposalsCompletedResults.HasCompleted.Should().BeTrue("all items have completed disposal"); + } + + [Theory] + [InlineData(SourceType.Subject)] + [InlineData(SourceType.Immediate)] + public void SourceErrors_ItemsAreDisposedAndErrorPropagates(SourceType sourceType) + { + var items = new[] + { + new ImmediateAsyncDisposableItem() { Id = 1, Version = 1}, + new ImmediateAsyncDisposableItem() { Id = 2, Version = 1}, + new ImmediateAsyncDisposableItem() { Id = 3, Version = 1} + }; + + var changeSet = new ChangeSet(items + .Select(item => new Change(reason: ChangeReason.Add, key: item.Id, current: item))); + + var error = new Exception("Test"); + + IObservable> source = (sourceType is SourceType.Immediate) + ? Observable.Return(changeSet) + .Concat(Observable.Throw>(error)) + : new Subject>(); + + + ValueRecordingObserver? disposalsCompletedResults = null; + + using var subscription = source + .AsyncDisposeMany(disposalsCompleted => + { + disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription"); + disposalsCompleted.RecordValues(out disposalsCompletedResults); + }) + .ValidateSynchronization() + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results); + + disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked"); + + + if (source is Subject> subject) + { + subject.OnNext(changeSet); + subject.OnError(error); + } + + results.Error.Should().Be(error); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "3 items were added"); + + items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed upon source failure"); + + disposalsCompletedResults.Error.Should().BeNull(); + disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "all items have completed disposal"); + disposalsCompletedResults.HasCompleted.Should().BeTrue("all items have completed disposal"); + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.AsyncDisposeMany( + source: null!, + disposalsCompletedAccessor: static _ => { })) + .Should() + .Throw(); + + public abstract record ItemBase + { + public static ItemBase Create( + ItemType type, + int id, + int version) + => type switch + { + ItemType.Plain => new PlainItem() + { + Id = id, + Version = version + }, + ItemType.Disposable => new DisposableItem() + { + Id = id, + Version = version + }, + ItemType.AsyncDisposable => new AsyncDisposableItem() + { + Id = id, + Version = version + }, + ItemType.ImmediateAsyncDisposable => new ImmediateAsyncDisposableItem() + { + Id = id, + Version = version + }, + _ => throw new ArgumentException($"{type} is not a valid {nameof(ItemType)} value", nameof(type)) + }; + + public required int Id { get; init; } + + public required int Version { get; init; } + + public abstract bool CanBeDisposed { get; } + + public abstract bool HasBeenDisposed { get; } + + public abstract void CompleteDisposal(); + + public abstract void FailDisposal(Exception error); + } + + public sealed record PlainItem + : ItemBase + { + public override bool CanBeDisposed + => false; + + public override bool HasBeenDisposed + => false; + + public override void CompleteDisposal() { } + + public override void FailDisposal(Exception error) { } + } + + public sealed record DisposableItem + : ItemBase, IDisposable + { + public override bool CanBeDisposed + => true; + + public override bool HasBeenDisposed + => _hasBeenDisposed; + + public override void CompleteDisposal() { } + + public override void FailDisposal(Exception error) + => _disposeError = error; + + public void Dispose() + { + if (_disposeError is not null) + #pragma warning disable CA1065 // Do not raise exceptions in unexpected locations + throw _disposeError; + #pragma warning restore CA1065 // Do not raise exceptions in unexpected locations + + _hasBeenDisposed = true; + } + + private Exception? _disposeError; + private bool _hasBeenDisposed; + } + + public sealed record AsyncDisposableItem + : ItemBase, IAsyncDisposable + { + public override bool CanBeDisposed + => true; + + public override bool HasBeenDisposed + => _hasBeenDisposed; + + public override void CompleteDisposal() + => _disposeCompletionSource.SetResult(); + + public override void FailDisposal(Exception error) + => _disposeCompletionSource.SetException(error); + + public ValueTask DisposeAsync() + { + _hasBeenDisposed = true; + + return new(_disposeCompletionSource.Task); + } + + private readonly TaskCompletionSource _disposeCompletionSource + = new(); + + private bool _hasBeenDisposed; + } + + public sealed record ImmediateAsyncDisposableItem + : ItemBase, IAsyncDisposable + { + public override bool CanBeDisposed + => true; + + public override bool HasBeenDisposed + => _hasBeenDisposed; + + public override void CompleteDisposal() { } + + public override void FailDisposal(Exception error) + => _disposeError = error; + + public ValueTask DisposeAsync() + { + _hasBeenDisposed = true; + + return (_disposeError is not null) + ? ValueTask.FromException(_disposeError) + : ValueTask.CompletedTask; + } + + private Exception? _disposeError; + private bool _hasBeenDisposed; + } +} diff --git a/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs b/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs index 6e29b35f..09718e7f 100644 --- a/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs +++ b/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs @@ -2,6 +2,8 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Reactive.Testing; @@ -11,6 +13,7 @@ namespace DynamicData.Tests.Utilities; public abstract class RecordingObserverBase : IObserver { + private readonly TaskCompletionSource _finalizationSource; private readonly List>> _notifications; private readonly IScheduler _scheduler; @@ -19,6 +22,7 @@ public abstract class RecordingObserverBase protected RecordingObserverBase(IScheduler scheduler) { + _finalizationSource = new(); _notifications = new(); _scheduler = scheduler; } @@ -35,6 +39,15 @@ public bool HasFinalized public IReadOnlyList>> Notifications => _notifications; + public async Task WaitForFinalizationAsync(TimeSpan timeout) + { + using var timeoutSource = new CancellationTokenSource(timeout); + + await await Task.WhenAny( + _finalizationSource.Task, + Task.Delay(Timeout.Infinite, timeoutSource.Token)); + } + protected abstract void OnNext(T value); void IObserver.OnCompleted() @@ -44,6 +57,7 @@ void IObserver.OnCompleted() value: Notification.CreateOnCompleted())); _hasCompleted = true; + _finalizationSource.SetResult(); } void IObserver.OnError(Exception error) @@ -54,6 +68,7 @@ void IObserver.OnError(Exception error) if (!HasFinalized) _error = error; + _finalizationSource.SetResult(); } void IObserver.OnNext(T value) diff --git a/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs b/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs new file mode 100644 index 00000000..c4d28265 --- /dev/null +++ b/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs @@ -0,0 +1,122 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +namespace DynamicData.Cache.Internal; + +#if SUPPORTS_ASYNC_DISPOSABLE +internal static class AsyncDisposeMany + where TObject : notnull + where TKey : notnull +{ + public static IObservable> Create( + IObservable> source, + Action> disposalsCompletedAccessor) + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + disposalsCompletedAccessor.ThrowArgumentNullExceptionIfNull(nameof(disposalsCompletedAccessor)); + + return Observable + .Create>(downstreamObserver => + { + var itemsByKey = new Dictionary(); + + var synchronizationGate = InternalEx.NewLock(); + + var disposals = new Subject>(); + var disposalsCompleted = disposals + .Merge() + .IgnoreElements() + .Concat(Observable.Return(Unit.Default)) + // If no one subscribes to this stream, disposals won't actually occur, so make sure we have one (and only one) regardless of what the consumer does. + .Publish() + .AutoConnect(0); + + // Make sure the consumer gets a chance to subscribe BEFORE we actually start processing items, so there's no risk of the consumer missing notifications. + disposalsCompletedAccessor.Invoke(disposalsCompleted); + + var sourceSubscription = source + .Synchronize(synchronizationGate) + // Using custom notification handlers instead of .Do() to make sure that we're not disposing items until AFTER we've notified all downstream listeners to remove them from their cached or bound collections. + .SubscribeSafe( + onNext: upstreamChanges => + { + downstreamObserver.OnNext(upstreamChanges); + + foreach (var change in upstreamChanges.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Update: + if (change.Previous.HasValue && !EqualityComparer.Default.Equals(change.Current, change.Previous.Value)) + TryDisposeItem(change.Previous.Value); + break; + + case ChangeReason.Remove: + TryDisposeItem(change.Current); + break; + } + } + + itemsByKey.Clone(upstreamChanges); + }, + onError: error => + { + downstreamObserver.OnError(error); + + TearDown(); + }, + onCompleted: () => + { + downstreamObserver.OnCompleted(); + + TearDown(); + }); + + return Disposable.Create(() => + { + lock (synchronizationGate) + { + sourceSubscription.Dispose(); + + TearDown(); + } + }); + + void TearDown() + { + if (disposals.HasObservers) + { + try + { + foreach (var item in itemsByKey.Values) + TryDisposeItem(item); + disposals.OnCompleted(); + + itemsByKey.Clear(); + } + catch (Exception error) + { + disposals.OnError(error); + } + } + } + + void TryDisposeItem(TObject item) + { + if (item is IDisposable disposable) + disposable.Dispose(); + else if (item is IAsyncDisposable asyncDisposable) + disposals.OnNext(Observable.FromAsync(() => asyncDisposable.DisposeAsync().AsTask())); + } + }); + } +} +#endif diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index aa548dbb..308b2dc0 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -299,6 +299,47 @@ public static IObservableCache AsObservableCache(t return new LockFreeObservableCache(source); } + #if SUPPORTS_ASYNC_DISPOSABLE + /// + /// + /// Automatically disposes items within the source collection, upon removal of the collection or teardown of the operator. + /// + /// + /// Individual items are disposed after removal or replacement changes have been sent downstream. + /// All items previously-published on the stream are disposed after the stream finalizes. + /// This includes both upstream completion or failure, or downstream un-subscription. + /// + /// + /// Disposal is supported for both and items. + /// Items implementing neither of these interfaces are unaffected by this operator. + /// + /// + /// The type of items in the source collection. + /// The type of key values used to uniquely identify items in the source collection. + /// A stream of changes from the source collection. + /// + /// + /// An action to be invoked upon each subscription to this operator, allowing the consumer access to the "disposalsCompleted" stream for that subscription. + /// + /// + /// The "disposalsCompleted" stream allows the consumer to properly observe the asynchronous disposal of any items that are disposed by the operator. This stream will emit a single value, and then complete, upon successfull completion of all invocations performed by the operator. + /// + /// + /// Providing these notifications within a downstream channel separate from the main collection change stream ensures that these notifications can be observed even in the event of a failure within the operator, or within stream. + /// + /// + /// A stream containing copies of all changes observed from . + /// Throws for and . + public static IObservable> AsyncDisposeMany( + this IObservable> source, + Action> disposalsCompletedAccessor) + where TObject : notnull + where TKey : notnull + => Cache.Internal.AsyncDisposeMany.Create( + source: source, + disposalsCompletedAccessor: disposalsCompletedAccessor); + #endif + /// /// Automatically refresh downstream operators when any properties change. ///