Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/ReverseProxy/Forwarder/ForwarderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ public async Task Invoke(HttpContext context)
activity?.AddTag("proxy.route_id", route.Config.RouteId);
activity?.AddTag("proxy.cluster_id", cluster.ClusterId);


if (destinations.Count == 0)
{
Log.NoAvailableDestinations(_logger, cluster.ClusterId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.NoAvailableDestinations, ex: null));
activity?.SetStatus(ActivityStatusCode.Error);
activity?.AddError("Proxy forwarding failed","No available destinations to route to");
activity?.AddError("Proxy forwarding failed", "No available destinations to forward to");
return;
}

Expand Down
45 changes: 21 additions & 24 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private async Task ProbeCluster(ClusterState cluster)
}

// Creates an Activity to trace the active health checks
var activity = Observability.YarpActivitySource.StartActivity("proxy.cluster_health_checks", ActivityKind.Consumer);
using var activity = Observability.YarpActivitySource.StartActivity("proxy.cluster_health_checks", ActivityKind.Consumer);
activity?.AddTag("proxy.cluster_id", cluster.ClusterId);

Log.StartingActiveHealthProbingOnCluster(_logger, cluster.ClusterId);
Expand Down Expand Up @@ -157,12 +157,15 @@ private async Task ProbeCluster(ClusterState cluster)
}

Log.StoppedActiveHealthProbingOnCluster(_logger, cluster.ClusterId);
activity?.Stop();
}
}

private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState cluster, DestinationState destination, TimeSpan timeout)
{
using var probeActivity = Observability.YarpActivitySource.StartActivity("proxy.destination_health_check", ActivityKind.Client);
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);

HttpRequestMessage request;
try
{
Expand All @@ -172,36 +175,30 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
{
Log.ActiveHealthProbeConstructionFailedOnCluster(_logger, destination.DestinationId, cluster.ClusterId, ex);

probeActivity?.SetStatus(ActivityStatusCode.Error);

return new DestinationProbingResult(destination, null, ex);
}

using (var probeActivity = Observability.YarpActivitySource.StartActivity("proxy.destination_health_check", ActivityKind.Client))
using var cts = new CancellationTokenSource(timeout);

try
{
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);
var cts = new CancellationTokenSource(timeout);
try
{
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
var response = await cluster.Model.HttpClient.SendAsync(request, cts.Token);
Log.DestinationProbingCompleted(_logger, destination.DestinationId, cluster.ClusterId, (int)response.StatusCode);
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
var response = await cluster.Model.HttpClient.SendAsync(request, cts.Token);
Log.DestinationProbingCompleted(_logger, destination.DestinationId, cluster.ClusterId, (int)response.StatusCode);

probeActivity?.SetStatus(ActivityStatusCode.Ok);
probeActivity?.SetStatus(ActivityStatusCode.Ok);

return new DestinationProbingResult(destination, response, null);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);
return new DestinationProbingResult(destination, response, null);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);

probeActivity?.SetStatus(ActivityStatusCode.Error);
probeActivity?.SetStatus(ActivityStatusCode.Error);

return new DestinationProbingResult(destination, null, ex);
}
finally
{
cts.Dispose();
}
return new DestinationProbingResult(destination, null, ex);
}
}
}
23 changes: 19 additions & 4 deletions src/ReverseProxy/Model/ProxyPipelineInitializerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ProxyPipelineInitializerMiddleware(RequestDelegate next,
_next = next ?? throw new ArgumentNullException(nameof(next));
}

public async Task Invoke(HttpContext context)
public Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint()
?? throw new InvalidOperationException($"Routing Endpoint wasn't set for the current request.");
Expand All @@ -39,7 +39,7 @@ public async Task Invoke(HttpContext context)
{
Log.NoClusterFound(_logger, route.Config.RouteId);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
return;
return Task.CompletedTask;
}

var destinationsState = cluster.DestinationsState;
Expand All @@ -51,11 +51,26 @@ public async Task Invoke(HttpContext context)
AvailableDestinations = destinationsState.AvailableDestinations,
});

using (var activity = Observability.YarpActivitySource.StartActivity("proxy.forwarder", ActivityKind.Server))
var activity = Observability.YarpActivitySource.CreateActivity("proxy.forwarder", ActivityKind.Server);

return activity is null
? _next(context)
: AwaitWithActivity(context, activity);
}

private async Task AwaitWithActivity(HttpContext context, Activity activity)
{
context.SetYarpActivity(activity);

activity.Start();
try
{
context.SetYarpActivity(activity);
await _next(context);
}
finally
{
activity.Dispose();
}
}

private static class Log
Expand Down
11 changes: 3 additions & 8 deletions src/ReverseProxy/Utilities/Observability.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.Http;

namespace Yarp.ReverseProxy.Utilities;
Expand All @@ -23,15 +17,16 @@ internal static class Observability

public static void SetYarpActivity(this HttpContext context, Activity? activity)
{
if (activity != null)
if (activity is not null)
{
context.Features[typeof(YarpActivity)] = activity;
}
}

public static void AddError(this Activity activity, string message, string description)
{
if (activity != null) {
if (activity is not null)
{
var tagsCollection = new ActivityTagsCollection
{
{ "error", message },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected override void OnEvent(IHttpTelemetryConsumer[] consumers, EventWritten
break;

case 3:
Debug.Assert(eventData.EventName == "RequestFailed" && payload.Count == 0);
Debug.Assert(eventData.EventName == "RequestFailed" /* && payload.Count == 0 */);
{
foreach (var consumer in consumers)
{
Expand Down