Skip to content

Commit 04e2ff8

Browse files
authored
Feature: GroupOnObservable Operator (#847)
New Group Operator that groups values based on the latest value returned from an observable created using the given factory function. - Values are not observed downstream until their Observable fires once - Any event after the first will have the effect of moving the value from one group to another - Any errors in the grouping observable will bring down the entire stream - If the source observable completes, the downstream sequence will also complete (and the sequence from each Group will complete). - If the number of items in a group drops to zero, the group will be removed and any subscribers to the Group's Cache will see OnComplete - If the number of items in a group drops to zero, but within the same changeset, another item gets added to that group, the group will NOT be removed/re-added.
1 parent 7922e03 commit 04e2ff8

File tree

10 files changed

+875
-3
lines changed

10 files changed

+875
-3
lines changed

src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,14 @@ namespace DynamicData
13671367
where TObject : notnull
13681368
where TKey : notnull
13691369
where TGroupKey : notnull { }
1370+
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnObservable<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Func<TObject, System.IObservable<TGroupKey>> groupObservableSelector)
1371+
where TObject : notnull
1372+
where TKey : notnull
1373+
where TGroupKey : notnull { }
1374+
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnObservable<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Func<TObject, TKey, System.IObservable<TGroupKey>> groupObservableSelector)
1375+
where TObject : notnull
1376+
where TKey : notnull
1377+
where TGroupKey : notnull { }
13701378
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnProperty<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Linq.Expressions.Expression<System.Func<TObject, TGroupKey>> propertySelector, System.TimeSpan? propertyChangedThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null)
13711379
where TObject : System.ComponentModel.INotifyPropertyChanged
13721380
where TKey : notnull
@@ -2869,6 +2877,21 @@ namespace DynamicData.Tests
28692877
public void Dispose() { }
28702878
protected virtual void Dispose(bool isDisposing) { }
28712879
}
2880+
public class GroupChangeSetAggregator<TObject, TKey, TGroupKey> : System.IDisposable
2881+
where TObject : notnull
2882+
where TKey : notnull
2883+
where TGroupKey : notnull
2884+
{
2885+
public GroupChangeSetAggregator(System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> source) { }
2886+
public DynamicData.IObservableCache<DynamicData.IGroup<TObject, TKey, TGroupKey>, TGroupKey> Data { get; }
2887+
public System.Exception? Error { get; }
2888+
public DynamicData.IObservableCache<DynamicData.Tests.ChangeSetAggregator<TObject, TKey>, TGroupKey> Groups { get; }
2889+
public bool IsCompleted { get; }
2890+
public System.Collections.Generic.IReadOnlyList<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> Messages { get; }
2891+
public DynamicData.Diagnostics.ChangeSummary Summary { get; }
2892+
public void Dispose() { }
2893+
protected virtual void Dispose(bool disposing) { }
2894+
}
28722895
public static class ListTextEx
28732896
{
28742897
public static DynamicData.Tests.ChangeSetAggregator<T> AsAggregator<T>(this System.IObservable<DynamicData.IChangeSet<T>> source)
@@ -2914,6 +2937,10 @@ namespace DynamicData.Tests
29142937
public static DynamicData.Tests.VirtualChangeSetAggregator<TObject, TKey> AsAggregator<TObject, TKey>(this System.IObservable<DynamicData.IVirtualChangeSet<TObject, TKey>> source)
29152938
where TObject : notnull
29162939
where TKey : notnull { }
2940+
public static DynamicData.Tests.GroupChangeSetAggregator<TValue, TKey, TGroupKey> AsAggregator<TValue, TKey, TGroupKey>(this System.IObservable<DynamicData.IGroupChangeSet<TValue, TKey, TGroupKey>> source)
2941+
where TValue : notnull
2942+
where TKey : notnull
2943+
where TGroupKey : notnull { }
29172944
}
29182945
public class VirtualChangeSetAggregator<TObject, TKey> : System.IDisposable
29192946
where TObject : notnull
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
using System;
2+
using System.Linq;
3+
using Bogus;
4+
using DynamicData.Tests.Domain;
5+
using DynamicData.Binding;
6+
using System.Reactive.Linq;
7+
using FluentAssertions;
8+
using Xunit;
9+
10+
using Person = DynamicData.Tests.Domain.Person;
11+
using System.Threading.Tasks;
12+
using DynamicData.Kernel;
13+
14+
namespace DynamicData.Tests.Cache;
15+
16+
public class GroupOnObservableFixture : IDisposable
17+
{
18+
#if DEBUG
19+
private const int InitialCount = 7;
20+
private const int AddCount = 5;
21+
private const int RemoveCount = 3;
22+
private const int UpdateCount = 2;
23+
#else
24+
private const int InitialCount = 103;
25+
private const int AddCount = 53;
26+
private const int RemoveCount = 37;
27+
private const int UpdateCount = 101;
28+
#endif
29+
private readonly SourceCache<Person, string> _personCache = new (p => p.UniqueKey);
30+
private readonly ChangeSetAggregator<Person, string> _personResults;
31+
private readonly GroupChangeSetAggregator<Person, string, Color> _favoriteColorResults;
32+
private readonly Faker<Person> _personFaker;
33+
private readonly Randomizer _randomizer = new(0x3141_5926);
34+
35+
public GroupOnObservableFixture()
36+
{
37+
_personFaker = Fakers.Person.Clone().WithSeed(_randomizer);
38+
_personResults = _personCache.Connect().AsAggregator();
39+
_favoriteColorResults = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator();
40+
}
41+
42+
[Fact]
43+
public void ResultContainsAllInitialChildren()
44+
{
45+
// Arrange
46+
47+
// Act
48+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
49+
50+
// Assert
51+
_personResults.Data.Count.Should().Be(InitialCount);
52+
_personResults.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset");
53+
VerifyGroupingResults();
54+
}
55+
56+
[Fact]
57+
public void ResultContainsAddedValues()
58+
{
59+
// Arrange
60+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
61+
62+
// Act
63+
_personCache.AddOrUpdate(_personFaker.Generate(AddCount));
64+
65+
// Assert
66+
_personResults.Data.Count.Should().Be(InitialCount + AddCount);
67+
_personResults.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message");
68+
VerifyGroupingResults();
69+
}
70+
71+
[Fact]
72+
public void ResultDoesNotContainRemovedValues()
73+
{
74+
// Arrange
75+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
76+
77+
// Act
78+
_personCache.RemoveKeys(_randomizer.ListItems(_personCache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey));
79+
80+
// Assert
81+
_personResults.Data.Count.Should().Be(InitialCount - RemoveCount);
82+
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
83+
VerifyGroupingResults();
84+
}
85+
86+
[Fact]
87+
public void ResultContainsUpdatedValues()
88+
{
89+
// Arrange
90+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
91+
var replacements = _randomizer.ListItems(_personCache.Items.ToList(), UpdateCount)
92+
.Select(replacePerson => Person.CloneUniqueId(_personFaker.Generate(), replacePerson));
93+
94+
// Act
95+
_personCache.AddOrUpdate(replacements);
96+
97+
// Assert
98+
_personResults.Data.Count.Should().Be(InitialCount, "Only replacements were made");
99+
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates");
100+
VerifyGroupingResults();
101+
}
102+
103+
[Fact]
104+
public void GroupRemovedWhenEmpty()
105+
{
106+
// Arrange
107+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
108+
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
109+
var removeColor = _randomizer.ListItem(usedColorList);
110+
var colorCount = usedColorList.Count;
111+
112+
// Act
113+
_personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)));
114+
115+
// Assert
116+
_personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1);
117+
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
118+
_favoriteColorResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
119+
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
120+
_favoriteColorResults.Summary.Overall.Removes.Should().Be(1);
121+
VerifyGroupingResults();
122+
}
123+
124+
[Fact]
125+
public void GroupNotRemovedIfAddedBackImmediately()
126+
{
127+
// Arrange
128+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
129+
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
130+
var removeColor = _randomizer.ListItem(usedColorList);
131+
var colorCount = usedColorList.Count;
132+
133+
// Act
134+
_personCache.Edit(updater =>
135+
{
136+
updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey));
137+
var newPerson = _personFaker.Generate();
138+
newPerson.FavoriteColor = removeColor;
139+
updater.AddOrUpdate(newPerson);
140+
});
141+
142+
// Assert
143+
_personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount);
144+
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value");
145+
_favoriteColorResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added");
146+
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
147+
_favoriteColorResults.Summary.Overall.Removes.Should().Be(0);
148+
VerifyGroupingResults();
149+
}
150+
151+
[Fact]
152+
public void GroupingSequenceCompletesWhenEmpty()
153+
{
154+
// Arrange
155+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
156+
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
157+
var removeColor = _randomizer.ListItem(usedColorList);
158+
159+
var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable)
160+
.Filter(grp => grp.Key == removeColor)
161+
.Take(1)
162+
.MergeMany(grp => grp.Cache.Connect())
163+
.AsAggregator();
164+
165+
// Act
166+
_personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)));
167+
168+
// Assert
169+
results.IsCompleted.Should().BeTrue();
170+
VerifyGroupingResults();
171+
}
172+
173+
[Fact]
174+
public void AllSequencesCompleteWhenSourceIsDisposed()
175+
{
176+
// Arrange
177+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
178+
179+
var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable)
180+
.MergeMany(grp => grp.Cache.Connect())
181+
.AsAggregator();
182+
183+
// Act
184+
_personCache.Dispose();
185+
186+
// Assert
187+
results.IsCompleted.Should().BeTrue();
188+
VerifyGroupingResults();
189+
}
190+
191+
[Fact]
192+
public void AllGroupsRemovedWhenCleared()
193+
{
194+
// Arrange
195+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
196+
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
197+
var colorCount = usedColorList.Count;
198+
199+
// Act
200+
_personCache.Clear();
201+
202+
// Assert
203+
_personCache.Items.Count().Should().Be(0);
204+
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
205+
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
206+
_favoriteColorResults.Summary.Overall.Removes.Should().Be(colorCount);
207+
VerifyGroupingResults();
208+
}
209+
210+
[Fact]
211+
public void ResultsContainsCorrectRegroupedValues()
212+
{
213+
// Arrange
214+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
215+
216+
// Act
217+
Enumerable.Range(0, UpdateCount).ForEach(_ => RandomFavoriteColorChange());
218+
219+
// Assert
220+
VerifyGroupingResults();
221+
}
222+
223+
[Fact]
224+
public async Task ResultsContainsCorrectRegroupedValuesAsync()
225+
{
226+
// Arrange
227+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
228+
var tasks = Enumerable.Range(0, UpdateCount).Select(_ => Task.Run(RandomFavoriteColorChange));
229+
230+
// Act
231+
await Task.WhenAll(tasks.ToArray());
232+
233+
// Assert
234+
VerifyGroupingResults();
235+
}
236+
237+
[Theory]
238+
[InlineData(false)]
239+
[InlineData(true)]
240+
public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource)
241+
{
242+
// Arrange
243+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
244+
245+
// Act
246+
if (completeSource)
247+
{
248+
_personCache.Dispose();
249+
}
250+
251+
// Assert
252+
_personResults.IsCompleted.Should().Be(completeSource);
253+
}
254+
255+
[Fact]
256+
public void ResultFailsIfSourceFails()
257+
{
258+
// Arrange
259+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
260+
var expectedError = new Exception("Expected");
261+
var throwObservable = Observable.Throw<IChangeSet<Person, string>>(expectedError);
262+
using var results = _personCache.Connect().Concat(throwObservable).GroupOnObservable(CreateFavoriteColorObservable).AsAggregator();
263+
264+
// Act
265+
_personCache.Dispose();
266+
267+
// Assert
268+
results.Error.Should().Be(expectedError);
269+
}
270+
271+
[Fact]
272+
public void ResultFailsIfGroupObservableFails()
273+
{
274+
// Arrange
275+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
276+
var expectedError = new Exception("Expected");
277+
var throwObservable = Observable.Throw<Color>(expectedError);
278+
279+
// Act
280+
using var results = _personCache.Connect().GroupOnObservable((person, key) => CreateFavoriteColorObservable(person, key).Take(1).Concat(throwObservable)).AsAggregator();
281+
282+
// Assert
283+
results.Error.Should().Be(expectedError);
284+
}
285+
286+
[Fact]
287+
public void OnErrorFiresIfSelectorThrows()
288+
{
289+
// Arrange
290+
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
291+
var expectedError = new Exception("Expected");
292+
293+
// Act
294+
using var results = _personCache.Connect().GroupOnObservable<Person, string, Color>(_ => throw expectedError).AsAggregator();
295+
296+
// Assert
297+
results.Error.Should().Be(expectedError);
298+
}
299+
300+
public void Dispose()
301+
{
302+
_favoriteColorResults.Dispose();
303+
_personResults.Dispose();
304+
_personCache.Dispose();
305+
}
306+
307+
private void RandomFavoriteColorChange()
308+
{
309+
var person = _randomizer.ListItem(_personCache.Items.ToList());
310+
lock (person)
311+
{
312+
// Pick a new favorite color
313+
person.FavoriteColor = _randomizer.RandomColor(person.FavoriteColor);
314+
}
315+
}
316+
317+
private void VerifyGroupingResults() =>
318+
VerifyGroupingResults(_personCache, _personResults, _favoriteColorResults);
319+
320+
private static void VerifyGroupingResults(ISourceCache<Person, string> personCache, ChangeSetAggregator<Person, string> personResults, GroupChangeSetAggregator<Person, string, Color> favoriteColorResults)
321+
{
322+
var expectedPersons = personCache.Items.ToList();
323+
var expectedGroupings = personCache.Items.GroupBy(p => p.FavoriteColor).ToList();
324+
325+
// These should be subsets of each other
326+
expectedPersons.Should().BeEquivalentTo(personResults.Data.Items);
327+
favoriteColorResults.Groups.Count.Should().Be(expectedGroupings.Count);
328+
329+
// Check each group
330+
foreach (var grouping in expectedGroupings)
331+
{
332+
var color = grouping.Key;
333+
var expectedGroup = grouping.ToList();
334+
var optionalGroup = favoriteColorResults.Groups.Lookup(color);
335+
336+
optionalGroup.HasValue.Should().BeTrue();
337+
var actualGroup = optionalGroup.Value.Data.Items.ToList();
338+
339+
expectedGroup.Should().BeEquivalentTo(actualGroup);
340+
}
341+
}
342+
343+
private static IObservable<Color> CreateFavoriteColorObservable(Person person, string key) =>
344+
person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value);
345+
}

0 commit comments

Comments
 (0)