Skip to content

Commit 4374062

Browse files
authored
Extract Aspire.Hosting.Kafka.Tests project (#4910)
* Extract Aspire.Hosting.Kafka.Tests project Contributes to #3185 Contributes to #4294
1 parent 4e9b1b6 commit 4374062

File tree

14 files changed

+242
-140
lines changed

14 files changed

+242
-140
lines changed

Aspire.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.PostgreSQL.T
512512
EndProject
513513
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Qdrant.Tests", "tests\Aspire.Hosting.Qdrant.Tests\Aspire.Hosting.Qdrant.Tests.csproj", "{8E2AA85E-C351-47B4-AF91-58557FAD5840}"
514514
EndProject
515+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Kafka.Tests", "tests\Aspire.Hosting.Kafka.Tests\Aspire.Hosting.Kafka.Tests.csproj", "{0A83AA67-221E-44B4-9BA9-DC64DC17949E}"
516+
EndProject
515517
Global
516518
GlobalSection(SolutionConfigurationPlatforms) = preSolution
517519
Debug|Any CPU = Debug|Any CPU
@@ -1338,6 +1340,10 @@ Global
13381340
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Debug|Any CPU.Build.0 = Debug|Any CPU
13391341
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.ActiveCfg = Release|Any CPU
13401342
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.Build.0 = Release|Any CPU
1343+
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
1344+
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.Build.0 = Debug|Any CPU
1345+
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.ActiveCfg = Release|Any CPU
1346+
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.Build.0 = Release|Any CPU
13411347
EndGlobalSection
13421348
GlobalSection(SolutionProperties) = preSolution
13431349
HideSolutionNode = FALSE
@@ -1583,6 +1589,7 @@ Global
15831589
{1BC02557-B78B-48CE-9D3C-488A6B7672F4} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
15841590
{8E2AA85E-C351-47B4-AF91-58557FAD5840} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
15851591
{7425E5B2-BC47-4521-AC40-B8CECA329E08} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
1592+
{0A83AA67-221E-44B4-9BA9-DC64DC17949E} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
15861593
EndGlobalSection
15871594
GlobalSection(ExtensibilityGlobals) = postSolution
15881595
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}

tests/Aspire.EndToEnd.Tests/IntegrationServicesFixture.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public Task DumpComponentLogsAsync(TestResourceNames resource, ITestOutputHelper
109109
TestResourceNames.cosmos or TestResourceNames.efcosmos => "cosmos",
110110
TestResourceNames.eventhubs => "eventhubs",
111111
TestResourceNames.garnet => "garnet",
112-
TestResourceNames.kafka => "kafka",
113112
TestResourceNames.milvus => "milvus",
114113
TestResourceNames.mongodb => "mongodb",
115114
TestResourceNames.mysql or TestResourceNames.efmysql => "mysql",
@@ -151,8 +150,7 @@ private static TestResourceNames GetResourcesToSkip()
151150
"oracle" => TestResourceNames.oracledatabase,
152151
"cosmos" => TestResourceNames.cosmos | TestResourceNames.efcosmos,
153152
"eventhubs" => TestResourceNames.eventhubs,
154-
"basicservices" => TestResourceNames.kafka
155-
| TestResourceNames.mongodb
153+
"basicservices" => TestResourceNames.mongodb
156154
| TestResourceNames.rabbitmq
157155
| TestResourceNames.redis
158156
| TestResourceNames.garnet

tests/Aspire.EndToEnd.Tests/IntegrationServicesTests.cs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,23 +78,6 @@ public Task VerifyCosmosComponentWorks(TestResourceNames resourceName)
7878
return VerifyComponentWorks(resourceName);
7979
}
8080

81-
[Fact]
82-
[Trait("scenario", "basicservices")]
83-
public Task KafkaComponentCanProduceAndConsume()
84-
=> RunTestAsync(async() =>
85-
{
86-
_integrationServicesFixture.EnsureAppHasResources(TestResourceNames.kafka);
87-
string topic = $"topic-{Guid.NewGuid()}";
88-
89-
var response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/produce/{topic}");
90-
var responseContent = await response.Content.ReadAsStringAsync();
91-
Assert.True(response.IsSuccessStatusCode, responseContent);
92-
93-
response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/consume/{topic}");
94-
responseContent = await response.Content.ReadAsStringAsync();
95-
Assert.True(response.IsSuccessStatusCode, responseContent);
96-
});
97-
9881
[Fact]
9982
// Include all the scenarios here so this test gets run for all of them.
10083
[Trait("scenario", "cosmos")]

tests/Aspire.Hosting.Tests/Kafka/AddKafkaTests.cs renamed to tests/Aspire.Hosting.Kafka.Tests/AddKafkaTests.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Net.Sockets;
5+
using Aspire.Hosting.ApplicationModel;
46
using Aspire.Hosting.Utils;
57
using Microsoft.Extensions.DependencyInjection;
6-
using System.Net.Sockets;
78
using Xunit;
89

9-
namespace Aspire.Hosting.Tests.Kafka;
10+
namespace Aspire.Hosting.Kafka.Tests;
11+
1012
public class AddKafkaTests
1113
{
1214
[Fact]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>$(NetCurrent)</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<ProjectReference Include="..\..\src\Aspire.Hosting.Kafka\Aspire.Hosting.Kafka.csproj" />
9+
<ProjectReference Include="..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
10+
<ProjectReference Include="..\Aspire.Hosting.Tests\Aspire.Hosting.Tests.csproj" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
15+
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
16+
</ItemGroup>
17+
18+
</Project>
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Aspire.Components.Common.Tests;
5+
using Aspire.Hosting.Utils;
6+
using Confluent.Kafka;
7+
using Microsoft.Extensions.Configuration;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Hosting;
10+
using Microsoft.Extensions.Logging;
11+
using Xunit;
12+
using Xunit.Abstractions;
13+
14+
namespace Aspire.Hosting.Kafka.Tests;
15+
16+
public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
17+
{
18+
[Fact]
19+
[RequiresDocker]
20+
public async Task VerifyKafkaResource()
21+
{
22+
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
23+
24+
var builder = CreateDistributedApplicationBuilder();
25+
26+
var kafka = builder.AddKafka("kafka");
27+
28+
using var app = builder.Build();
29+
30+
await app.StartAsync();
31+
32+
var hb = Host.CreateApplicationBuilder();
33+
34+
hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
35+
{
36+
[$"ConnectionStrings:{kafka.Resource.Name}"] = await kafka.Resource.ConnectionStringExpression.GetValueAsync(default)
37+
});
38+
39+
hb.AddKafkaProducer<string, string>("kafka");
40+
hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
41+
{
42+
consumerBuilder.Config.GroupId = "aspire-consumer-group";
43+
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
44+
});
45+
46+
using var host = hb.Build();
47+
48+
await host.StartAsync();
49+
50+
var topic = "test-topic";
51+
var producer = host.Services.GetRequiredService<IProducer<string, string>>();
52+
for (var i = 0; i < 10; i++)
53+
{
54+
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
55+
}
56+
57+
var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
58+
consumer.Subscribe(topic);
59+
for (var i = 0; i < 10; i++)
60+
{
61+
var result = consumer.Consume(cts.Token);
62+
63+
Assert.Equal($"test-key", result.Message.Key);
64+
Assert.Equal($"test-value{i}", result.Message.Value);
65+
}
66+
}
67+
68+
[Theory]
69+
[ActiveIssue("https://github.com/dotnet/aspire/issues/4909")]
70+
[InlineData(true)]
71+
[InlineData(false)]
72+
[RequiresDocker]
73+
public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
74+
{
75+
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
76+
var topic = "test-topic";
77+
string? volumeName = null;
78+
string? bindMountPath = null;
79+
80+
try
81+
{
82+
var builder1 = CreateDistributedApplicationBuilder();
83+
var kafka1 = builder1.AddKafka("kafka");
84+
85+
if (useVolume)
86+
{
87+
// Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
88+
volumeName = VolumeNameGenerator.CreateVolumeName(kafka1, nameof(WithDataShouldPersistStateBetweenUsages));
89+
90+
// if the volume already exists (because of a crashing previous run), try to delete it
91+
DockerUtils.AttemptDeleteDockerVolume(volumeName);
92+
kafka1.WithDataVolume(volumeName);
93+
}
94+
else
95+
{
96+
bindMountPath = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
97+
kafka1.WithDataBindMount(bindMountPath);
98+
}
99+
100+
using (var app = builder1.Build())
101+
{
102+
await app.StartAsync();
103+
try
104+
{
105+
var hb = Host.CreateApplicationBuilder();
106+
107+
hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
108+
{
109+
[$"ConnectionStrings:{kafka1.Resource.Name}"] = await kafka1.Resource.ConnectionStringExpression.GetValueAsync(default)
110+
});
111+
112+
hb.AddKafkaProducer<string, string>("kafka");
113+
114+
using (var host = hb.Build())
115+
{
116+
await host.StartAsync();
117+
118+
var producer = host.Services.GetRequiredService<IProducer<string, string>>();
119+
for (var i = 0; i < 10; i++)
120+
{
121+
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
122+
}
123+
}
124+
}
125+
finally
126+
{
127+
// Stops the container, or the Volume/mount would still be in use
128+
await app.StopAsync();
129+
}
130+
}
131+
132+
var builder2 = CreateDistributedApplicationBuilder();
133+
var kafka2 = builder2.AddKafka("kafka");
134+
135+
if (useVolume)
136+
{
137+
kafka2.WithDataVolume(volumeName);
138+
}
139+
else
140+
{
141+
kafka2.WithDataBindMount(bindMountPath!);
142+
}
143+
144+
using (var app = builder2.Build())
145+
{
146+
await app.StartAsync();
147+
try
148+
{
149+
var hb = Host.CreateApplicationBuilder();
150+
151+
hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
152+
{
153+
[$"ConnectionStrings:{kafka2.Resource.Name}"] = await kafka2.Resource.ConnectionStringExpression.GetValueAsync(default)
154+
});
155+
156+
hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
157+
{
158+
consumerBuilder.Config.GroupId = "aspire-consumer-group";
159+
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
160+
});
161+
162+
using (var host = hb.Build())
163+
{
164+
await host.StartAsync();
165+
166+
var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
167+
consumer.Subscribe(topic);
168+
for (var i = 0; i < 10; i++)
169+
{
170+
var result = consumer.Consume(cts.Token);
171+
172+
Assert.Equal($"test-key", result.Message.Key);
173+
Assert.Equal($"test-value{i}", result.Message.Value);
174+
}
175+
}
176+
}
177+
finally
178+
{
179+
// Stops the container, or the Volume/mount would still be in use
180+
await app.StopAsync();
181+
}
182+
}
183+
}
184+
finally
185+
{
186+
if (volumeName is not null)
187+
{
188+
DockerUtils.AttemptDeleteDockerVolume(volumeName);
189+
}
190+
191+
if (bindMountPath is not null)
192+
{
193+
try
194+
{
195+
File.Delete(bindMountPath);
196+
}
197+
catch
198+
{
199+
// Don't fail test if we can't clean the temporary folder
200+
}
201+
}
202+
}
203+
}
204+
205+
private TestDistributedApplicationBuilder CreateDistributedApplicationBuilder()
206+
{
207+
var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry();
208+
builder.Services.AddXunitLogging(testOutputHelper);
209+
return builder;
210+
}
211+
}

tests/Aspire.Hosting.Tests/Aspire.Hosting.Tests.csproj

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
<ProjectReference Include="..\..\src\Aspire.Hosting.MongoDB\Aspire.Hosting.MongoDB.csproj" IsAspireProjectResource="false" />
3535
<ProjectReference Include="..\..\src\Aspire.Hosting.MySql\Aspire.Hosting.MySql.csproj" IsAspireProjectResource="false" />
3636
<ProjectReference Include="..\..\src\Aspire.Hosting.Nats\Aspire.Hosting.Nats.csproj" IsAspireProjectResource="false" />
37-
<ProjectReference Include="..\..\src\Aspire.Hosting.Qdrant\Aspire.Hosting.Qdrant.csproj" IsAspireProjectResource="false" />
3837
<ProjectReference Include="..\..\src\Aspire.Hosting.Testing\Aspire.Hosting.Testing.csproj" />
3938
<ProjectReference Include="..\..\src\Aspire.Hosting.Python\Aspire.Hosting.Python.csproj" IsAspireProjectResource="false" />
4039
<ProjectReference Include="..\..\src\Aspire.Hosting.Valkey\Aspire.Hosting.Valkey.csproj" IsAspireProjectResource="false" />
@@ -55,7 +54,6 @@
5554

5655
<Compile Include="$(TestsSharedDir)Logging\*.cs" LinkBase="shared/Logging" />
5756
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Garnet\GarnetContainerImageTags.cs" />
58-
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
5957
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Nats\NatsContainerImageTags.cs" />
6058
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Milvus\MilvusContainerImageTags.cs" />
6159
<Compile Include="$(RepoRoot)src\Aspire.Hosting.MongoDB\MongoDBContainerImageTags.cs" />

tests/Aspire.Hosting.Tests/ManifestGenerationTests.cs

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -306,25 +306,6 @@ public void EnsureAllRabbitMQManifestTypesHaveVersion0Suffix()
306306
Assert.Equal("container.v0", server.GetProperty("type").GetString());
307307
}
308308

309-
[Fact]
310-
public void EnsureAllKafkaManifestTypesHaveVersion0Suffix()
311-
{
312-
using var program = CreateTestProgramJsonDocumentManifestPublisher();
313-
314-
program.AppBuilder.AddKafka("kafkacontainer");
315-
316-
// Build AppHost so that publisher can be resolved.
317-
program.Build();
318-
var publisher = program.GetManifestPublisher();
319-
320-
program.Run();
321-
322-
var resources = publisher.ManifestDocument.RootElement.GetProperty("resources");
323-
324-
var server = resources.GetProperty("kafkacontainer");
325-
Assert.Equal("container.v0", server.GetProperty("type").GetString());
326-
}
327-
328309
[Fact]
329310
public void NodeAppIsExecutableResource()
330311
{
@@ -508,7 +489,6 @@ public void VerifyTestProgramFullManifest()
508489
"ConnectionStrings__rabbitmq": "{rabbitmq.connectionString}",
509490
"ConnectionStrings__mymongodb": "{mymongodb.connectionString}",
510491
"ConnectionStrings__freepdb1": "{freepdb1.connectionString}",
511-
"ConnectionStrings__kafka": "{kafka.connectionString}",
512492
"ConnectionStrings__cosmos": "{cosmos.connectionString}",
513493
"ConnectionStrings__eventhubns": "{eventhubns.connectionString}",
514494
"ConnectionStrings__milvus": "{milvus.connectionString}"
@@ -685,30 +665,6 @@ public void VerifyTestProgramFullManifest()
685665
"type": "value.v0",
686666
"connectionString": "{oracledatabase.connectionString}/freepdb1"
687667
},
688-
"kafka": {
689-
"type": "container.v0",
690-
"connectionString": "{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port}",
691-
"image": "{{TestConstants.AspireTestContainerRegistry}}/{{KafkaContainerImageTags.Image}}:{{KafkaContainerImageTags.Tag}}",
692-
"env": {
693-
"KAFKA_LISTENERS": "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093",
694-
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
695-
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://{kafka.bindings.tcp.host}:29092,PLAINTEXT_HOST://{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port},PLAINTEXT_INTERNAL://{kafka.bindings.internal.host}:{kafka.bindings.internal.port}"
696-
},
697-
"bindings": {
698-
"tcp": {
699-
"scheme": "tcp",
700-
"protocol": "tcp",
701-
"transport": "tcp",
702-
"targetPort": 9092
703-
},
704-
"internal": {
705-
"scheme": "tcp",
706-
"protocol": "tcp",
707-
"transport": "tcp",
708-
"targetPort": 9093
709-
}
710-
}
711-
},
712668
"cosmos": {
713669
"type": "azure.bicep.v0",
714670
"connectionString": "{cosmos.secretOutputs.connectionString}",

0 commit comments

Comments
 (0)