Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.PostgreSQL.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Qdrant.Tests", "tests\Aspire.Hosting.Qdrant.Tests\Aspire.Hosting.Qdrant.Tests.csproj", "{8E2AA85E-C351-47B4-AF91-58557FAD5840}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Hosting.Kafka.Tests", "tests\Aspire.Hosting.Kafka.Tests\Aspire.Hosting.Kafka.Tests.csproj", "{0A83AA67-221E-44B4-9BA9-DC64DC17949E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1338,6 +1340,10 @@ Global
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7425E5B2-BC47-4521-AC40-B8CECA329E08}.Release|Any CPU.Build.0 = Release|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0A83AA67-221E-44B4-9BA9-DC64DC17949E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1583,6 +1589,7 @@ Global
{1BC02557-B78B-48CE-9D3C-488A6B7672F4} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{8E2AA85E-C351-47B4-AF91-58557FAD5840} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{7425E5B2-BC47-4521-AC40-B8CECA329E08} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
{0A83AA67-221E-44B4-9BA9-DC64DC17949E} = {830A89EC-4029-4753-B25A-068BAE37DEC7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
4 changes: 1 addition & 3 deletions tests/Aspire.EndToEnd.Tests/IntegrationServicesFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public Task DumpComponentLogsAsync(TestResourceNames resource, ITestOutputHelper
TestResourceNames.cosmos or TestResourceNames.efcosmos => "cosmos",
TestResourceNames.eventhubs => "eventhubs",
TestResourceNames.garnet => "garnet",
TestResourceNames.kafka => "kafka",
TestResourceNames.milvus => "milvus",
TestResourceNames.mongodb => "mongodb",
TestResourceNames.mysql or TestResourceNames.efmysql => "mysql",
Expand Down Expand Up @@ -151,8 +150,7 @@ private static TestResourceNames GetResourcesToSkip()
"oracle" => TestResourceNames.oracledatabase,
"cosmos" => TestResourceNames.cosmos | TestResourceNames.efcosmos,
"eventhubs" => TestResourceNames.eventhubs,
"basicservices" => TestResourceNames.kafka
| TestResourceNames.mongodb
"basicservices" => TestResourceNames.mongodb
| TestResourceNames.rabbitmq
| TestResourceNames.redis
| TestResourceNames.garnet
Expand Down
17 changes: 0 additions & 17 deletions tests/Aspire.EndToEnd.Tests/IntegrationServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,6 @@ public Task VerifyCosmosComponentWorks(TestResourceNames resourceName)
return VerifyComponentWorks(resourceName);
}

[Fact]
[Trait("scenario", "basicservices")]
public Task KafkaComponentCanProduceAndConsume()
=> RunTestAsync(async() =>
{
_integrationServicesFixture.EnsureAppHasResources(TestResourceNames.kafka);
string topic = $"topic-{Guid.NewGuid()}";

var response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/produce/{topic}");
var responseContent = await response.Content.ReadAsStringAsync();
Assert.True(response.IsSuccessStatusCode, responseContent);

response = await _integrationServicesFixture.IntegrationServiceA.HttpGetAsync("http", $"/kafka/consume/{topic}");
responseContent = await response.Content.ReadAsStringAsync();
Assert.True(response.IsSuccessStatusCode, responseContent);
});

[Fact]
// Include all the scenarios here so this test gets run for all of them.
[Trait("scenario", "cosmos")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Net.Sockets;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Microsoft.Extensions.DependencyInjection;
using System.Net.Sockets;
using Xunit;

namespace Aspire.Hosting.Tests.Kafka;
namespace Aspire.Hosting.Kafka.Tests;

public class AddKafkaTests
{
[Fact]
Expand Down
18 changes: 18 additions & 0 deletions tests/Aspire.Hosting.Kafka.Tests/Aspire.Hosting.Kafka.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Aspire.Hosting.Kafka\Aspire.Hosting.Kafka.csproj" />
<ProjectReference Include="..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
<ProjectReference Include="..\Aspire.Hosting.Tests\Aspire.Hosting.Tests.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>

</Project>
211 changes: 211 additions & 0 deletions tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace Aspire.Hosting.Kafka.Tests;

public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyKafkaResource()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));

var builder = CreateDistributedApplicationBuilder();

var kafka = builder.AddKafka("kafka");

using var app = builder.Build();

await app.StartAsync();

var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka.Resource.Name}"] = await kafka.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaProducer<string, string>("kafka");
hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
{
consumerBuilder.Config.GroupId = "aspire-consumer-group";
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

using var host = hb.Build();

await host.StartAsync();

var topic = "test-topic";
var producer = host.Services.GetRequiredService<IProducer<string, string>>();
for (var i = 0; i < 10; i++)
{
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
}

var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
consumer.Subscribe(topic);
for (var i = 0; i < 10; i++)
{
var result = consumer.Consume(cts.Token);

Assert.Equal($"test-key", result.Message.Key);
Assert.Equal($"test-value{i}", result.Message.Value);
}
}

[Theory]
[ActiveIssue("https://github.com/dotnet/aspire/issues/4909")]
[InlineData(true)]
[InlineData(false)]
[RequiresDocker]
public async Task WithDataShouldPersistStateBetweenUsages(bool useVolume)
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
var topic = "test-topic";
string? volumeName = null;
string? bindMountPath = null;

try
{
var builder1 = CreateDistributedApplicationBuilder();
var kafka1 = builder1.AddKafka("kafka");

if (useVolume)
{
// Use a deterministic volume name to prevent them from exhausting the machines if deletion fails
volumeName = VolumeNameGenerator.CreateVolumeName(kafka1, nameof(WithDataShouldPersistStateBetweenUsages));

// if the volume already exists (because of a crashing previous run), try to delete it
DockerUtils.AttemptDeleteDockerVolume(volumeName);
kafka1.WithDataVolume(volumeName);
}
else
{
bindMountPath = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
kafka1.WithDataBindMount(bindMountPath);
}

using (var app = builder1.Build())
{
await app.StartAsync();
try
{
var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka1.Resource.Name}"] = await kafka1.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaProducer<string, string>("kafka");

using (var host = hb.Build())
{
await host.StartAsync();

var producer = host.Services.GetRequiredService<IProducer<string, string>>();
for (var i = 0; i < 10; i++)
{
await producer.ProduceAsync(topic, new Message<string, string> { Key = "test-key", Value = $"test-value{i}" });
}
}
}
finally
{
// Stops the container, or the Volume/mount would still be in use
await app.StopAsync();
}
}

var builder2 = CreateDistributedApplicationBuilder();
var kafka2 = builder2.AddKafka("kafka");

if (useVolume)
{
kafka2.WithDataVolume(volumeName);
}
else
{
kafka2.WithDataBindMount(bindMountPath!);
}

using (var app = builder2.Build())
{
await app.StartAsync();
try
{
var hb = Host.CreateApplicationBuilder();

hb.Configuration.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{kafka2.Resource.Name}"] = await kafka2.Resource.ConnectionStringExpression.GetValueAsync(default)
});

hb.AddKafkaConsumer<string, string>("kafka", consumerBuilder =>
{
consumerBuilder.Config.GroupId = "aspire-consumer-group";
consumerBuilder.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
});

using (var host = hb.Build())
{
await host.StartAsync();

var consumer = host.Services.GetRequiredService<IConsumer<string, string>>();
consumer.Subscribe(topic);
for (var i = 0; i < 10; i++)
{
var result = consumer.Consume(cts.Token);

Assert.Equal($"test-key", result.Message.Key);
Assert.Equal($"test-value{i}", result.Message.Value);
}
}
}
finally
{
// Stops the container, or the Volume/mount would still be in use
await app.StopAsync();
}
}
}
finally
{
if (volumeName is not null)
{
DockerUtils.AttemptDeleteDockerVolume(volumeName);
}

if (bindMountPath is not null)
{
try
{
File.Delete(bindMountPath);
}
catch
{
// Don't fail test if we can't clean the temporary folder
}
}
}
}

private TestDistributedApplicationBuilder CreateDistributedApplicationBuilder()
{
var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry();
builder.Services.AddXunitLogging(testOutputHelper);
return builder;
}
}
2 changes: 0 additions & 2 deletions tests/Aspire.Hosting.Tests/Aspire.Hosting.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<ProjectReference Include="..\..\src\Aspire.Hosting.MongoDB\Aspire.Hosting.MongoDB.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.MySql\Aspire.Hosting.MySql.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Nats\Aspire.Hosting.Nats.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Qdrant\Aspire.Hosting.Qdrant.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Testing\Aspire.Hosting.Testing.csproj" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Python\Aspire.Hosting.Python.csproj" IsAspireProjectResource="false" />
<ProjectReference Include="..\..\src\Aspire.Hosting.Valkey\Aspire.Hosting.Valkey.csproj" IsAspireProjectResource="false" />
Expand All @@ -55,7 +54,6 @@

<Compile Include="$(TestsSharedDir)Logging\*.cs" LinkBase="shared/Logging" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Garnet\GarnetContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Kafka\KafkaContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Nats\NatsContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Milvus\MilvusContainerImageTags.cs" />
<Compile Include="$(RepoRoot)src\Aspire.Hosting.MongoDB\MongoDBContainerImageTags.cs" />
Expand Down
44 changes: 0 additions & 44 deletions tests/Aspire.Hosting.Tests/ManifestGenerationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,25 +306,6 @@ public void EnsureAllRabbitMQManifestTypesHaveVersion0Suffix()
Assert.Equal("container.v0", server.GetProperty("type").GetString());
}

[Fact]
public void EnsureAllKafkaManifestTypesHaveVersion0Suffix()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test adds value over the AddKafkaTests.VerifyManifest test. That test verifies it is a container.v0 resource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok :)

{
using var program = CreateTestProgramJsonDocumentManifestPublisher();

program.AppBuilder.AddKafka("kafkacontainer");

// Build AppHost so that publisher can be resolved.
program.Build();
var publisher = program.GetManifestPublisher();

program.Run();

var resources = publisher.ManifestDocument.RootElement.GetProperty("resources");

var server = resources.GetProperty("kafkacontainer");
Assert.Equal("container.v0", server.GetProperty("type").GetString());
}

[Fact]
public void NodeAppIsExecutableResource()
{
Expand Down Expand Up @@ -508,7 +489,6 @@ public void VerifyTestProgramFullManifest()
"ConnectionStrings__rabbitmq": "{rabbitmq.connectionString}",
"ConnectionStrings__mymongodb": "{mymongodb.connectionString}",
"ConnectionStrings__freepdb1": "{freepdb1.connectionString}",
"ConnectionStrings__kafka": "{kafka.connectionString}",
"ConnectionStrings__cosmos": "{cosmos.connectionString}",
"ConnectionStrings__eventhubns": "{eventhubns.connectionString}",
"ConnectionStrings__milvus": "{milvus.connectionString}"
Expand Down Expand Up @@ -685,30 +665,6 @@ public void VerifyTestProgramFullManifest()
"type": "value.v0",
"connectionString": "{oracledatabase.connectionString}/freepdb1"
},
"kafka": {
"type": "container.v0",
"connectionString": "{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port}",
"image": "{{TestConstants.AspireTestContainerRegistry}}/{{KafkaContainerImageTags.Image}}:{{KafkaContainerImageTags.Tag}}",
"env": {
"KAFKA_LISTENERS": "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://{kafka.bindings.tcp.host}:29092,PLAINTEXT_HOST://{kafka.bindings.tcp.host}:{kafka.bindings.tcp.port},PLAINTEXT_INTERNAL://{kafka.bindings.internal.host}:{kafka.bindings.internal.port}"
},
"bindings": {
"tcp": {
"scheme": "tcp",
"protocol": "tcp",
"transport": "tcp",
"targetPort": 9092
},
"internal": {
"scheme": "tcp",
"protocol": "tcp",
"transport": "tcp",
"targetPort": 9093
}
}
},
"cosmos": {
"type": "azure.bicep.v0",
"connectionString": "{cosmos.secretOutputs.connectionString}",
Expand Down
Loading