Skip to content

Commit 7d2ac99

Browse files
committed
Fix timing issue in parallel execution (microsoft#4629)
1 parent a10f2dd commit 7d2ac99

File tree

4 files changed

+118
-49
lines changed

4 files changed

+118
-49
lines changed

src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelOperationManager.cs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;
1111
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
12+
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
1213

1314
namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;
1415

@@ -63,6 +64,8 @@ public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManage
6364

6465
private void ClearSlots(bool acceptMoreWork)
6566
{
67+
EqtTrace.Verbose($"ParallelOperationManager.ClearSlots: Clearing all slots. Slots should accept more work: {acceptMoreWork}");
68+
6669
lock (_lock)
6770
{
6871
_acceptMoreWork = acceptMoreWork;
@@ -76,6 +79,13 @@ private void SetOccupiedSlotCount()
7679
{
7780
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
7881
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;
82+
83+
if (EqtTrace.IsVerboseEnabled)
84+
{
85+
EqtTrace.Verbose($"ParallelOperationManager.SetOccupiedSlotCount: Setting slot counts AvailableSlotCount = {AvailableSlotCount}, OccupiedSlotCount = {OccupiedSlotCount}.");
86+
EqtTrace.Verbose($"Occupied slots:\n{(string.Join("\n", _managerSlots.Where(s => s.HasWork).Select((slot) => $"{slot.Index}: {GetSourcesForSlotExpensive(slot)}").ToArray()))}");
87+
88+
}
7989
}
8090

8191
public void StartWork(
@@ -91,6 +101,7 @@ public void StartWork(
91101
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
92102
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));
93103

104+
EqtTrace.Verbose($"ParallelOperationManager.StartWork: Starting adding {workloads.Count} workloads.");
94105
_workloads.AddRange(workloads);
95106

96107
ClearSlots(acceptMoreWork: true);
@@ -123,7 +134,10 @@ private bool RunWorkInParallel()
123134
// so when it is allowed to enter it will try to add more work, but we already cancelled,
124135
// so we should not start more work.
125136
if (!_acceptMoreWork)
137+
{
138+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We don't accept more work, returning false.");
126139
return false;
140+
}
127141

128142
// We grab all empty slots.
129143
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
@@ -136,11 +150,10 @@ private bool RunWorkInParallel()
136150
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();
137151

138152
// We associate each workload to a slot, if we reached the max parallel
139-
// level, then we will run only initalize step of the given workload.
153+
// level, then we will run only initialize step of the given workload.
140154
for (int i = 0; i < amount; i++)
141155
{
142156
var slot = availableSlots[i];
143-
slot.HasWork = true;
144157
var workload = workloadsToAdd[i];
145158
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;
146159

@@ -152,6 +165,13 @@ private bool RunWorkInParallel()
152165
slot.Work = workload.Work;
153166

154167
_workloads.Remove(workload);
168+
169+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Adding 1 workload to slot, remaining workloads {_workloads.Count}.");
170+
171+
// This must be set last, every loop below looks at this property,
172+
// and they can do so from a different thread. So if we mark it as HasWork before actually assigning the work
173+
// we can pick up the slot, but it has no associated work yet.
174+
slot.HasWork = true;
155175
}
156176

157177
slots = _managerSlots.ToArray();
@@ -172,12 +192,16 @@ private bool RunWorkInParallel()
172192
{
173193
startedWork++;
174194
slot.IsRunning = true;
175-
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
195+
if (EqtTrace.IsVerboseEnabled)
196+
{
197+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host for work (source) {GetSourcesForSlotExpensive(slot)}: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
198+
}
176199
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
177200

178201
// We already started as many as we were allowed, jump out;
179202
if (startedWork == MaxParallelLevel)
180203
{
204+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
181205
break;
182206
}
183207
}
@@ -194,14 +218,18 @@ private bool RunWorkInParallel()
194218
{
195219
startedWork++;
196220
slot.IsRunning = true;
197-
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host.");
221+
if (EqtTrace.IsVerboseEnabled)
222+
{
223+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Started host in slot number {slot.Index} for work (source): {GetSourcesForSlotExpensive(slot)}.");
224+
}
198225
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
199226
}
200227
}
201228

202229
// We already started as many as we were allowed, jump out;
203230
if (startedWork == MaxParallelLevel)
204231
{
232+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
205233
break;
206234
}
207235
}
@@ -215,14 +243,19 @@ private bool RunWorkInParallel()
215243
preStartedWork++;
216244
slot.PreStartTime = DateTime.Now.TimeOfDay;
217245
slot.IsPreStarted = true;
218-
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host.");
246+
if (EqtTrace.IsVerboseEnabled)
247+
{
248+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Pre-starting a host for work (source): {GetSourcesForSlotExpensive(slot)}.");
249+
}
219250
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
220251
}
221252
}
222253

223254
// Return true when we started more work. Or false, when there was nothing more to do.
224255
// This will propagate to handling of partial discovery or partial run.
225-
return preStartedWork + startedWork > 0;
256+
var weAddedMoreWork = preStartedWork + startedWork > 0;
257+
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {preStartedWork + startedWork} work items in here, returning {weAddedMoreWork}.");
258+
return weAddedMoreWork;
226259
}
227260

228261
public bool RunNextWork(TManager completedManager)
@@ -258,6 +291,10 @@ private void ClearCompletedSlot(TManager completedManager)
258291
throw new InvalidOperationException("The provided manager was found in multiple slots.");
259292
}
260293

294+
if (EqtTrace.IsVerboseEnabled)
295+
{
296+
EqtTrace.Verbose($"ParallelOperationManager.ClearCompletedSlot: Clearing slot number {completedSlot[0].Index} with work (source): {GetSourcesForSlotExpensive(completedSlot[0])}.");
297+
}
261298
var slot = completedSlot[0];
262299
slot.PreStartTime = TimeSpan.Zero;
263300
slot.Work = default(TWorkload);
@@ -273,8 +310,14 @@ private void ClearCompletedSlot(TManager completedManager)
273310
}
274311
}
275312

313+
private static string GetSourcesForSlotExpensive(ParallelOperationManager<TManager, TEventHandler, TWorkload>.Slot slot)
314+
{
315+
return string.Join(", ", (slot.Work as DiscoveryCriteria)?.Sources ?? (slot.Work as TestRunCriteria)?.Sources ?? Array.Empty<string>());
316+
}
317+
276318
public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParallel = false)
277319
{
320+
EqtTrace.Verbose($"ParallelOperationManager.DoActionOnAllManagers: Calling an action on all managers.");
278321
// We don't need to lock here, we just grab the current list of
279322
// slots that are occupied (have managers) and run action on each one of them.
280323
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
@@ -320,11 +363,13 @@ private static void DoManagerAction(Action action)
320363

321364
internal void StopAllManagers()
322365
{
366+
EqtTrace.Verbose($"ParallelOperationManager.StopAllManagers: Stopping all managers.");
323367
ClearSlots(acceptMoreWork: false);
324368
}
325369

326370
public void Dispose()
327371
{
372+
EqtTrace.Verbose($"ParallelOperationManager.Dispose: Disposing all managers.");
328373
ClearSlots(acceptMoreWork: false);
329374
}
330375

src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyDiscoveryManager.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,12 @@ private void DiscoverTestsOnConcurrentManager(
287287
bool initialized,
288288
Task? task)
289289
{
290+
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
291+
if (discoveryCriteria == null)
292+
{
293+
throw new ArgumentNullException(nameof(discoveryCriteria));
294+
}
295+
290296
// Kick off another discovery task for the next source
291297
Task.Run(() =>
292298
{

src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyExecutionManager.cs

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,12 @@ public bool HandlePartialRunComplete(
159159
? _runCompletedClients == _runStartedClients
160160
: _runCompletedClients == _availableWorkloads;
161161

162-
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total completed clients = {0}, Run complete = {1}, Run canceled: {2}.", _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
162+
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total workloads = {0}, Total started clients = {1} Total completed clients = {2}, Run complete = {3}, Run canceled: {4}.", _availableWorkloads, _runStartedClients, _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
163163
}
164164

165165
if (allRunsCompleted)
166166
{
167+
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: All runs completed stopping all managers.");
167168
_parallelOperationManager.StopAllManagers();
168169
return true;
169170
}
@@ -185,8 +186,13 @@ public bool HandlePartialRunComplete(
185186
// {
186187
// return true;
187188
// }
189+
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Not cancelled or aborted, running next work.");
188190
var _ = _parallelOperationManager.RunNextWork(proxyExecutionManager);
189191
}
192+
else
193+
{
194+
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Cancelled or aborted, not running next work.");
195+
}
190196

191197
return false;
192198
}
@@ -403,7 +409,7 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu
403409
// clients to be done running their workloads when aborting/cancelling and that doesn't
404410
// happen with an initialized workload that is never run.
405411
//
406-
// Interlocked.Increment(ref _runStartedClients);
412+
// Interlocked.Increment(ref _runStartedClients); <- BUG: Is this a bug waiting to happen for pre-started hosts?
407413
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
408414
});
409415
}
@@ -421,54 +427,60 @@ private void StartTestRunOnConcurrentManager(
421427
bool initialized,
422428
Task? initTask)
423429
{
424-
if (testRunCriteria != null)
430+
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
431+
if (testRunCriteria == null)
425432
{
426-
Task.Run(() =>
427-
{
428-
if (!initialized)
429-
{
430-
if (!proxyExecutionManager.IsInitialized)
431-
{
432-
proxyExecutionManager.Initialize(_skipDefaultAdapters);
433-
}
433+
throw new ArgumentNullException(nameof(testRunCriteria));
434+
}
434435

435-
Interlocked.Increment(ref _runStartedClients);
436-
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
437-
}
438-
else
436+
Task.Run(() =>
437+
{
438+
if (!initialized)
439+
{
440+
if (!proxyExecutionManager.IsInitialized)
439441
{
440-
initTask!.Wait();
442+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing uninitialized client. Started clients: " + _runStartedClients);
443+
proxyExecutionManager.Initialize(_skipDefaultAdapters);
441444
}
442445

443-
EqtTrace.Verbose("ParallelProxyExecutionManager: Execution started. Started clients: " + _runStartedClients);
446+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing test run. Started clients: " + _runStartedClients);
447+
Interlocked.Increment(ref _runStartedClients);
448+
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
449+
}
450+
else
451+
{
452+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Waiting for pre-initialized client to finish initialization. Started clients: " + _runStartedClients);
453+
initTask!.Wait();
454+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Pre-initialized client finished initialization. Started clients: " + _runStartedClients);
455+
}
444456

445-
proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
446-
})
447-
.ContinueWith(t =>
448-
{
449-
// Just in case, the actual execution couldn't start for an instance. Ensure that
450-
// we call execution complete since we have already fetched a source. Otherwise
451-
// execution will not terminate
452-
EqtTrace.Error("ParallelProxyExecutionManager: Failed to trigger execution. Exception: " + t.Exception);
453-
454-
var handler = eventHandler;
455-
var exceptionToString = t.Exception?.ToString();
456-
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
457-
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
458-
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);
459-
460-
// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
461-
// Differences:
462-
// Aborted is sent to allow the current execution manager replaced with another instance
463-
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
464-
// (since the test run didn't even start)
465-
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
466-
handler.HandleTestRunComplete(completeArgs, null, null, null);
467-
},
468-
TaskContinuationOptions.OnlyOnFaulted);
469-
}
457+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution starting. Started clients: " + _runStartedClients);
470458

471-
EqtTrace.Verbose("ProxyParallelExecutionManager: No sources available for execution.");
459+
proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
460+
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution started. Started clients: " + _runStartedClients);
461+
})
462+
.ContinueWith(t =>
463+
{
464+
// Just in case, the actual execution couldn't start for an instance. Ensure that
465+
// we call execution complete since we have already fetched a source. Otherwise
466+
// execution will not terminate
467+
EqtTrace.Error("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager(continuation): Failed to trigger execution. Exception: " + t.Exception);
468+
469+
var handler = eventHandler;
470+
var exceptionToString = t.Exception?.ToString();
471+
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
472+
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
473+
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);
474+
475+
// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
476+
// Differences:
477+
// Aborted is sent to allow the current execution manager replaced with another instance
478+
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
479+
// (since the test run didn't even start)
480+
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
481+
handler.HandleTestRunComplete(completeArgs, null, null, null);
482+
},
483+
TaskContinuationOptions.OnlyOnFaulted);
472484
}
473485

474486
public void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler)

src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelRunEventsHandler.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,12 @@ public virtual void HandleTestRunComplete(
6666
ICollection<AttachmentSet>? runContextAttachments,
6767
ICollection<string>? executorUris)
6868
{
69+
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Handling a run completion, this can be either one part of parallel run completing, or the whole parallel run completing.");
6970
var parallelRunComplete = HandleSingleTestRunComplete(testRunCompleteArgs, lastChunkArgs, runContextAttachments, executorUris);
7071

7172
if (parallelRunComplete)
7273
{
74+
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Whole parallel run completed.");
7375
var completedArgs = new TestRunCompleteEventArgs(_runDataAggregator.GetAggregatedRunStats(),
7476
_runDataAggregator.IsCanceled,
7577
_runDataAggregator.IsAborted,
@@ -96,6 +98,10 @@ public virtual void HandleTestRunComplete(
9698

9799
HandleParallelTestRunComplete(completedArgs);
98100
}
101+
else
102+
{
103+
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Single part of parallel run completed, but whole run is not complete yet.");
104+
}
99105
}
100106

101107
protected bool HandleSingleTestRunComplete(TestRunCompleteEventArgs testRunCompleteArgs,

0 commit comments

Comments
 (0)