Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/Aspire.Hosting/Dcp/ApplicationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2088,14 +2088,16 @@ async Task StartExecutableOrContainerAsync<T>(T resource) where T : CustomResour

// Ensure resource is deleted. DeleteAsync returns before the resource is completely deleted so we must poll
// to discover when it is safe to recreate the resource. This is required because the resources share the same name.
// Deleting a resource might take a while (more than 10 seconds), because DCP tries to gracefully shut it down first
// before resorting to more extreme measures.
if (!resourceNotFound)
{
var ensureDeleteRetryStrategy = new RetryStrategyOptions()
{
BackoffType = DelayBackoffType.Linear,
MaxDelay = TimeSpan.FromSeconds(0.5),
BackoffType = DelayBackoffType.Exponential,
Delay = TimeSpan.FromMilliseconds(200),
UseJitter = true,
MaxRetryAttempts = 5,
MaxRetryAttempts = 6, // Cumulative time for all attempts amounts to about 12 seconds
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
OnRetry = (retry) =>
{
Expand Down
19 changes: 17 additions & 2 deletions src/Aspire.Hosting/Dcp/KubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Runtime.CompilerServices;
using Aspire.Hosting.Dcp.Model;
using k8s;
using k8s.Autorest;
using k8s.Exceptions;
using k8s.Models;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -90,6 +91,7 @@ public Task<T> GetAsync<T>(string name, string? namespaceParameter = null, Cance

return KubernetesJson.Deserialize<T>(response.Body.ToString());
},
RetryOnConnectivityAndConflictErrors,
cancellationToken);
}

Expand Down Expand Up @@ -122,6 +124,7 @@ public Task<T> CreateAsync<T>(T obj, CancellationToken cancellationToken = defau

return KubernetesJson.Deserialize<T>(response.Body.ToString());
},
RetryOnConnectivityErrors,
cancellationToken);
}

Expand Down Expand Up @@ -156,6 +159,7 @@ public Task<T> PatchAsync<T>(T obj, V1Patch patch, CancellationToken cancellatio

return KubernetesJson.Deserialize<T>(response.Body.ToString());
},
RetryOnConnectivityErrors,
cancellationToken);
}

Expand Down Expand Up @@ -185,6 +189,7 @@ public Task<List<T>> ListAsync<T>(string? namespaceParameter = null, Cancellatio

return KubernetesJson.Deserialize<CustomResourceList<T>>(response.Body.ToString()).Items;
},
RetryOnConnectivityAndConflictErrors,
cancellationToken);
}

Expand Down Expand Up @@ -216,6 +221,7 @@ public Task<T> DeleteAsync<T>(string name, string? namespaceParameter = null, Ca

return KubernetesJson.Deserialize<T>(response.Body.ToString());
},
RetryOnConnectivityAndConflictErrors,
cancellationToken);
}

Expand Down Expand Up @@ -248,6 +254,7 @@ public Task<T> DeleteAsync<T>(string name, string? namespaceParameter = null, Ca

return responseTask.WatchAsync<T, object>(null, cancellationToken);
},
RetryOnConnectivityAndConflictErrors,
cancellationToken).ConfigureAwait(false);

await foreach (var item in result.ConfigureAwait(false))
Expand Down Expand Up @@ -290,6 +297,7 @@ public Task<Stream> GetLogStreamAsync<T>(

return response.Body;
},
RetryOnConnectivityAndConflictErrors,
cancellationToken
);
}
Expand All @@ -315,19 +323,22 @@ private Task<TResult> ExecuteWithRetry<TResult>(
DcpApiOperationType operationType,
string resourceType,
Func<DcpKubernetesClient, TResult> operation,
Func<Exception, bool> isRetryable,
CancellationToken cancellationToken)
{
return ExecuteWithRetry<TResult>(
operationType,
resourceType,
(DcpKubernetesClient kubernetes) => Task.FromResult(operation(kubernetes)),
isRetryable,
cancellationToken);
}

private async Task<TResult> ExecuteWithRetry<TResult>(
DcpApiOperationType operationType,
string resourceType,
Func<DcpKubernetesClient, Task<TResult>> operation,
Func<Exception, bool> isRetryable,
CancellationToken cancellationToken)
{
var currentTimestamp = DateTime.UtcNow;
Expand All @@ -344,7 +355,7 @@ private async Task<TResult> ExecuteWithRetry<TResult>(
await EnsureKubernetesAsync(cancellationToken).ConfigureAwait(false);
return await operation(_kubernetes!).ConfigureAwait(false);
}
catch (Exception e) when (IsRetryable(e))
catch (Exception e) when (isRetryable(e))
{
if (DateTime.UtcNow.Subtract(currentTimestamp) > MaxRetryDuration)
{
Expand All @@ -364,7 +375,11 @@ private async Task<TResult> ExecuteWithRetry<TResult>(
}
}

private static bool IsRetryable(Exception ex) => ex is HttpRequestException || ex is KubeConfigException;
private static bool RetryOnConnectivityErrors(Exception ex) => ex is HttpRequestException || ex is KubeConfigException;
private static bool RetryOnConnectivityAndConflictErrors(Exception ex) =>
ex is HttpRequestException ||
ex is KubeConfigException ||
(ex is HttpOperationException hoe && hoe.Response.StatusCode == System.Net.HttpStatusCode.Conflict);

private ResiliencePipeline GetReadKubeconfigResiliencePipeline()
{
Expand Down
Loading