Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
Expand All @@ -24,7 +25,6 @@ public Finally(IObservable<TSource> source, Action finallyAction)
internal sealed class _ : IdentitySink<TSource>
{
private readonly Action _finallyAction;

private IDisposable _sourceDisposable;

public _(Action finallyAction, IObserver<TSource> observer)
Expand All @@ -35,19 +35,43 @@ public _(Action finallyAction, IObserver<TSource> observer)

public override void Run(IObservable<TSource> source)
{
Disposable.SetSingle(ref _sourceDisposable, source.SubscribeSafe(this));
var d = source.SubscribeSafe(this);

if (Interlocked.CompareExchange(ref _sourceDisposable, d, null) == BooleanDisposable.True)
{
// The Dispose(bool) methode was already called before the
// subscription could be assign, hence the subscription
// needs to be diposed here and the action needs to be invoked.
try
{
d.Dispose();
}
finally
{
_finallyAction();
}
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
if (Disposable.TryDispose(ref _sourceDisposable))
var d = Interlocked.Exchange(ref _sourceDisposable, BooleanDisposable.True);
if (d != BooleanDisposable.True && d != null)
{
_finallyAction();
try
{
d.Dispose();
}
finally
{
_finallyAction();
}
}
}
base.Dispose(disposing);
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,37 @@ public _(IObserver<TSource> observer)
public void Run(Using<TSource, TResource> parent)
{
var source = default(IObservable<TSource>);
var disposable = Disposable.Empty;
try
{
var resource = parent._resourceFactory();
if (resource != null)
{
Disposable.SetSingle(ref _disposable, resource);
disposable = resource;
}

source = parent._observableFactory(resource);
}
catch (Exception exception)
{
SetUpstream(Observable.Throw<TSource>(exception).SubscribeSafe(this));

return;
source = Observable.Throw<TSource>(exception);
}

// It is important to set the disposable resource after
// Run(). In the synchronous case this would else dispose
// the the resource before the source subscription.
Run(source);
Disposable.SetSingle(ref _disposable, disposable);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
Disposable.TryDispose(ref _disposable);
}
base.Dispose(disposing);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using Xunit;
Expand Down Expand Up @@ -142,5 +143,48 @@ public void Finally_Throw()
);
}

[Fact]
public void Finally_DisposeOrder_Empty()
{
var order = "";
Observable
.Empty<Unit>()
.Finally(() => order += "1")
.Finally(() => order += "2")
.Finally(() => order += "3")
.Subscribe();

Assert.Equal("123", order);
}

[Fact]
public void Finally_DisposeOrder_Return()
{
var order = "";
Observable
.Return(Unit.Default)
.Finally(() => order += "1")
.Finally(() => order += "2")
.Finally(() => order += "3")
.Subscribe();

Assert.Equal("123", order);
}

[Fact]
public void Finally_DisposeOrder_Never()
{
var order = "";
var d = Observable
.Never<Unit>()
.Finally(() => order += "1")
.Finally(() => order += "2")
.Finally(() => order += "3")
.Subscribe();

d.Dispose();

Assert.Equal("123", order);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using ReactiveTests.Dummies;
Expand Down Expand Up @@ -295,5 +297,35 @@ public void Using_ThrowResourceUsage()
);
}

[Fact]
public void Using_NestedCompleted()
{
var order = "";

Observable.Using(() => Disposable.Create(() => order += "3"),
_ => Observable.Using(() => Disposable.Create(() => order += "2"),
__ => Observable.Using(() => Disposable.Create(() => order += "1"),
___ => Observable.Return(Unit.Default))))
.Finally(() => order += "4")
.Subscribe();

Assert.Equal("1234", order);
}

[Fact]
public void Using_NestedDisposed()
{
var order = "";

Observable.Using(() => Disposable.Create(() => order += "3"),
_ => Observable.Using(() => Disposable.Create(() => order += "2"),
__ => Observable.Using(() => Disposable.Create(() => order += "1"),
___ => Observable.Never<Unit>())))
.Finally(() => order += "4")
.Subscribe()
.Dispose();

Assert.Equal("1234", order);
}
}
}
2 changes: 1 addition & 1 deletion Rx.NET/Source/version.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "4.1.5",
"version": "4.1.6",
"publicReleaseRefSpec": [
"^refs/heads/master$", // we release out of master
"^refs/heads/rel/v\\d+\\.\\d+" // we also release branches starting with rel/vN.N
Expand Down