Skip to content

Commit 77f9539

Browse files
authored
Generalized Solution for avoiding Out-of-Order Cache Child ChangeSets (#1009)
Created helper class to manage a cache changeset where each item owns a child subscription and adapted the following to use it: 1) TransformOnObservable 2) MergeManyChangeSets 3) MergeManyChangeSets (Source Compare) 4) MergeManyChangeSets (Cache -> List) 5) TransformManyAsync 6) GroupOnObservable To ensure all operations are handled in order, using a single pass through the ChangeSet, so the changeset order is preserved for the downstream.
1 parent 50a4c48 commit 77f9539

11 files changed

+514
-344
lines changed

src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,47 @@ public void MergeManyChangeSetsWorksCorrectlyWithValueTypes()
811811
results.Summary.Overall.Removes.Should().Be(PricesPerMarket);
812812
}
813813

814+
815+
[Theory]
816+
[InlineData(true)]
817+
[InlineData(false)]
818+
public void OrderOfChangesIsPreserved(bool removeFirst)
819+
{
820+
// Arrange
821+
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
822+
AddUniquePrices(markets);
823+
_marketCache.AddOrUpdate(markets);
824+
var markets2 = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
825+
AddUniquePrices(markets2);
826+
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
827+
(var firstReason, var nextReason, int expectedChanges) = removeFirst
828+
? (ChangeReason.Remove, ChangeReason.Add, 2 * MarketCount * PricesPerMarket)
829+
: (ChangeReason.Add, ChangeReason.Remove, 3 * MarketCount * PricesPerMarket);
830+
831+
// Act
832+
_marketCache.Edit(updater =>
833+
{
834+
if (removeFirst)
835+
{
836+
updater.Clear();
837+
updater.AddOrUpdate(markets2);
838+
}
839+
else
840+
{
841+
842+
updater.AddOrUpdate(markets2);
843+
updater.Clear();
844+
}
845+
});
846+
847+
// Assert
848+
results.Messages.Count.Should().Be(2);
849+
results.Messages[0].All(change => change.Reason is ChangeReason.Add).Should().BeTrue();
850+
results.Messages[1].Count.Should().Be(expectedChanges);
851+
results.Messages[1].Take(MarketCount * PricesPerMarket).All(change => change.Reason == firstReason).Should().BeTrue();
852+
results.Messages[1].Skip(MarketCount * PricesPerMarket).All(change => change.Reason == nextReason).Should().BeTrue();
853+
}
854+
814855
public void Dispose()
815856
{
816857
_marketCacheResults.Dispose();

src/DynamicData/Cache/Internal/ChangeSetCache.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ internal sealed class ChangeSetCache<TObject, TKey>
1616
where TKey : notnull
1717
{
1818
public ChangeSetCache(IObservable<IChangeSet<TObject, TKey>> source) =>
19-
Source = source.IgnoreSameReferenceUpdate().Do(Cache.Clone);
19+
Source = source.Do(Cache.Clone);
2020

2121
public Cache<TObject, TKey> Cache { get; } = new();
2222

src/DynamicData/Cache/Internal/DynamicGrouper.cs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,44 @@ public void ProcessChangeSet(IChangeSet<TObject, TKey> changeSet, IObserver<IGro
4444

4545
foreach (var change in changeSet.ToConcreteType())
4646
{
47-
switch (change.Reason)
48-
{
49-
case ChangeReason.Add when _groupSelector is not null:
50-
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
51-
break;
47+
ProcessChange(change, suspendTracker);
48+
}
49+
50+
if (observer != null)
51+
{
52+
EmitChanges(observer);
53+
}
54+
}
5255

53-
case ChangeReason.Remove:
54-
PerformRemove(change.Key, suspendTracker);
55-
break;
56+
public void ProcessChange(Change<TObject, TKey> change) => ProcessChange(change, _suspendTracker);
5657

57-
case ChangeReason.Update when _groupSelector is not null:
58-
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
59-
break;
58+
private void ProcessChange(Change<TObject, TKey> change, SuspendTracker? suspendTracker)
59+
{
60+
switch (change.Reason)
61+
{
62+
case ChangeReason.Add when _groupSelector is not null:
63+
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
64+
break;
6065

61-
case ChangeReason.Update:
62-
PerformUpdate(change.Key, suspendTracker);
63-
break;
66+
case ChangeReason.Remove:
67+
PerformRemove(change.Key, suspendTracker);
68+
break;
6469

65-
case ChangeReason.Refresh when _groupSelector is not null:
66-
PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
67-
break;
70+
case ChangeReason.Update when _groupSelector is not null:
71+
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
72+
break;
6873

69-
case ChangeReason.Refresh:
70-
PerformRefresh(change.Key, suspendTracker);
71-
break;
72-
}
73-
}
74+
case ChangeReason.Update:
75+
PerformUpdate(change.Key, suspendTracker);
76+
break;
7477

75-
if (observer != null)
76-
{
77-
EmitChanges(observer);
78+
case ChangeReason.Refresh when _groupSelector is not null:
79+
PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
80+
break;
81+
82+
case ChangeReason.Refresh:
83+
PerformRefresh(change.Key, suspendTracker);
84+
break;
7885
}
7986
}
8087

src/DynamicData/Cache/Internal/GroupOnObservable.cs

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Roland Pheasant licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

5-
using System.Reactive.Disposables;
65
using System.Reactive.Linq;
76
using DynamicData.Internal;
87

@@ -13,49 +12,58 @@ internal sealed class GroupOnObservable<TObject, TKey, TGroupKey>(IObservable<IC
1312
where TKey : notnull
1413
where TGroupKey : notnull
1514
{
16-
public IObservable<IGroupChangeSet<TObject, TKey, TGroupKey>> Run() => Observable.Create<IGroupChangeSet<TObject, TKey, TGroupKey>>(observer =>
15+
public IObservable<IGroupChangeSet<TObject, TKey, TGroupKey>> Run() =>
16+
Observable.Create<IGroupChangeSet<TObject, TKey, TGroupKey>>(observer => new Subscription(source, selectGroup, observer));
17+
18+
// Maintains state for a single subscription
19+
private sealed class Subscription : CacheParentSubscription<TObject, TKey, (TGroupKey, TObject), IGroupChangeSet<TObject, TKey, TGroupKey>>
1720
{
18-
var grouper = new DynamicGrouper<TObject, TKey, TGroupKey>();
19-
var locker = InternalEx.NewLock();
20-
var parentUpdate = false;
21-
22-
IObservable<TGroupKey> CreateGroupObservable(TObject item, TKey key) =>
23-
selectGroup(item, key)
24-
.DistinctUntilChanged()
25-
.Synchronize(locker!)
26-
.Do(
27-
onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null),
28-
onError: observer.OnError);
29-
30-
// Create a shared connection to the source
31-
var shared = source
32-
.Synchronize(locker)
33-
.Do(_ => parentUpdate = true)
34-
.Publish();
35-
36-
// First process the changesets
37-
var subChanges = shared
38-
.SubscribeSafe(
39-
onNext: changeSet => grouper.ProcessChangeSet(changeSet),
40-
onError: observer.OnError);
41-
42-
// Next process the Grouping observables created for each item
43-
var subMergeMany = shared
44-
.MergeMany(CreateGroupObservable)
45-
.SubscribeSafe(
46-
onError: observer.OnError,
47-
onCompleted: observer.OnCompleted);
48-
49-
// Finally, emit the results
50-
var subResults = shared
51-
.SubscribeSafe(
52-
onNext: _ =>
21+
private readonly DynamicGrouper<TObject, TKey, TGroupKey> _grouper = new();
22+
private readonly Func<TObject, TKey, IObservable<TGroupKey>> _selectGroup;
23+
24+
public Subscription(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<TGroupKey>> selectGroup, IObserver<IGroupChangeSet<TObject, TKey, TGroupKey>> observer)
25+
: base(observer)
26+
{
27+
_selectGroup = selectGroup;
28+
CreateParentSubscription(source);
29+
}
30+
31+
protected override void ParentOnNext(IChangeSet<TObject, TKey> changes)
32+
{
33+
// Process all the changes at once to preserve the changeset order
34+
foreach (var change in changes.ToConcreteType())
35+
{
36+
_grouper.ProcessChange(change);
37+
38+
switch (change.Reason)
5339
{
54-
grouper.EmitChanges(observer);
55-
parentUpdate = false;
56-
},
57-
onError: observer.OnError);
40+
// Shutdown existing sub (if any) and create a new one that
41+
// Will update the group key for the current item
42+
case ChangeReason.Add or ChangeReason.Update:
43+
AddGroupSubscription(change.Current, change.Key);
44+
break;
45+
46+
// Shutdown the existing subscription
47+
case ChangeReason.Remove:
48+
RemoveChildSubscription(change.Key);
49+
break;
50+
}
51+
}
52+
}
53+
54+
protected override void ChildOnNext((TGroupKey, TObject) tuple, TKey parentKey) =>
55+
_grouper.AddOrUpdate(parentKey, tuple.Item1, tuple.Item2);
56+
57+
protected override void EmitChanges(IObserver<IGroupChangeSet<TObject, TKey, TGroupKey>> observer) =>
58+
_grouper.EmitChanges(observer);
59+
60+
protected override void Dispose(bool disposing)
61+
{
62+
_grouper.Dispose();
63+
base.Dispose(disposing);
64+
}
5865

59-
return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper);
60-
});
66+
private void AddGroupSubscription(TObject obj, TKey key) =>
67+
AddChildSubscription(MakeChildObservable(_selectGroup(obj, key).DistinctUntilChanged().Select(groupKey => (groupKey, obj))), key);
68+
}
6169
}

src/DynamicData/Cache/Internal/MergeChangeSets.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange
4343
// Can optimize for the Add case because that's the only one that applies
4444
#if NET9_0_OR_GREATER
4545
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index, Lock locker) =>
46-
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.Synchronize(locker)));
46+
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.IgnoreSameReferenceUpdate().Synchronize(locker)));
4747

4848
// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
4949
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, Lock locker) =>
5050
source.Select((src, index) => new ChangeSet<ChangeSetCache<TObject, TKey>, int>(new[] { CreateChange(src, index, locker) }));
5151
#else
5252
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index, object locker) =>
53-
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.Synchronize(locker)));
53+
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.IgnoreSameReferenceUpdate().Synchronize(locker)));
5454

5555
// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
5656
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, object locker) =>

src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Roland Pheasant licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

5-
using System.Reactive.Disposables;
65
using System.Reactive.Linq;
76
using DynamicData.Internal;
87

@@ -11,53 +10,68 @@ namespace DynamicData.Cache.Internal;
1110
/// <summary>
1211
/// Operator that is similiar to MergeMany but intelligently handles Cache ChangeSets.
1312
/// </summary>
14-
internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDestinationKey>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> selector, IEqualityComparer<TDestination>? equalityComparer, IComparer<TDestination>? comparer)
13+
internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDestinationKey>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> changeSetSelector, IEqualityComparer<TDestination>? equalityComparer, IComparer<TDestination>? comparer)
1514
where TObject : notnull
1615
where TKey : notnull
1716
where TDestination : notnull
1817
where TDestinationKey : notnull
1918
{
2019
public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observable.Create<IChangeSet<TDestination, TDestinationKey>>(
21-
observer =>
20+
observer => new Subscription(source, changeSetSelector, observer, equalityComparer, comparer));
21+
22+
// Maintains state for a single subscription
23+
private sealed class Subscription : CacheParentSubscription<ChangeSetCache<TDestination, TDestinationKey>, TKey, IChangeSet<TDestination, TDestinationKey>, IChangeSet<TDestination, TDestinationKey>>
24+
{
25+
private readonly Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey> _cache = new();
26+
private readonly ChangeSetMergeTracker<TDestination, TDestinationKey> _changeSetMergeTracker;
27+
28+
public Subscription(
29+
IObservable<IChangeSet<TObject, TKey>> source,
30+
Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> changeSetSelector,
31+
IObserver<IChangeSet<TDestination, TDestinationKey>> observer,
32+
IEqualityComparer<TDestination>? equalityComparer,
33+
IComparer<TDestination>? comparer)
34+
: base(observer)
35+
{
36+
_changeSetMergeTracker = new(() => _cache.Items, comparer, equalityComparer);
37+
38+
// Child Observable has to go into the ChangeSetCache so the locking protects it
39+
CreateParentSubscription(source.Transform((obj, key) =>
40+
new ChangeSetCache<TDestination, TDestinationKey>(MakeChildObservable(changeSetSelector(obj, key).IgnoreSameReferenceUpdate()))));
41+
}
42+
43+
protected override void ParentOnNext(IChangeSet<ChangeSetCache<TDestination, TDestinationKey>, TKey> changes)
2244
{
23-
var locker = InternalEx.NewLock();
24-
var cache = new Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey>();
25-
var parentUpdate = false;
26-
27-
// This is manages all of the changes
28-
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => cache.Items, comparer, equalityComparer);
29-
30-
// Transform to a cache changeset of child caches, synchronize, update the local copy, and publish.
31-
var shared = source
32-
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
33-
.Synchronize(locker)
34-
.Do(changes =>
45+
// Process all the changes at once to preserve the changeset order
46+
foreach (var change in changes.ToConcreteType())
47+
{
48+
switch (change.Reason)
3549
{
36-
cache.Clone(changes);
37-
parentUpdate = true;
38-
})
39-
.Publish();
40-
41-
// Merge the child changeset changes together and apply to the tracker
42-
var subMergeMany = shared
43-
.MergeMany(cacheChangeSet => cacheChangeSet.Source)
44-
.SubscribeSafe(
45-
changes => changeTracker.ProcessChangeSet(changes, !parentUpdate ? observer : null),
46-
observer.OnError,
47-
observer.OnCompleted);
48-
49-
// When a source item is removed, all of its sub-items need to be removed
50-
var subRemove = shared
51-
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
52-
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
53-
.SubscribeSafe(
54-
_ =>
55-
{
56-
changeTracker.EmitChanges(observer);
57-
parentUpdate = false;
58-
},
59-
observer.OnError);
60-
61-
return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
62-
});
50+
// Shutdown existing sub (if any) and create a new one that
51+
// Will update the cache and emit the changes
52+
case ChangeReason.Add or ChangeReason.Update:
53+
_cache.AddOrUpdate(change.Current, change.Key);
54+
AddChildSubscription(change.Current.Source, change.Key);
55+
if (change.Previous.HasValue)
56+
{
57+
_changeSetMergeTracker.RemoveItems(change.Previous.Value.Cache.KeyValues);
58+
}
59+
break;
60+
61+
// Shutdown the existing subscription and remove from the cache
62+
case ChangeReason.Remove:
63+
_cache.Remove(change.Key);
64+
RemoveChildSubscription(change.Key);
65+
_changeSetMergeTracker.RemoveItems(change.Current.Cache.KeyValues);
66+
break;
67+
}
68+
}
69+
}
70+
71+
protected override void ChildOnNext(IChangeSet<TDestination, TDestinationKey> changes, TKey parentKey) =>
72+
_changeSetMergeTracker.ProcessChangeSet(changes, null);
73+
74+
protected override void EmitChanges(IObserver<IChangeSet<TDestination, TDestinationKey>> observer) =>
75+
_changeSetMergeTracker.EmitChanges(observer);
76+
}
6377
}

0 commit comments

Comments
 (0)