Skip to content

Commit 8b8e4a0

Browse files
authored
Fix network recovering (#62)
# Description Fix a number of problems when handling disconnections, and with asynchronous tasks in general. ## Network disconnection When `AbstractProtocol.AutoRecover` is set to true: * requests sent through the network would never be resolved when the connection is lost before they receive a response. This means that clients awaiting for a response would freeze forever. With this PR, those tasks are correctly rejected with a `ConnectionLost` exception (we cannot queue them: as far as the SDK is concerned, it cannot know if these requests were received and processed by Kuzzle or not) * there was a race condition when triggering the threaded recovering process where, in rare cases, 2 reconnection attempts would run in parallel, doubling the number of opened sockets ## Asynchronous tasks * Race conditions could occur and corrupt the requests cache, or freeze the SDK because of an unhandled exception, because of an incorrect use of `lock` (this keyword has no impact when multiple accesses are made to the same "locked" object within the same thread) => semaphores are now used to circumvent the problem * Requests tasks were incorrectly configured, making their resolution synchronous. This means that resolving tasks in the SDK event handlers (which manage network state changes) could prevent them to finish in a timely fashion (or... ever) because they would synchronously trigger awaiting code # Boyscout * Save our standard code style directly in the solution to share it to whomever works on the project
1 parent 7781e6f commit 8b8e4a0

File tree

7 files changed

+142
-75
lines changed

7 files changed

+142
-75
lines changed

Kuzzle.Tests/Offline/Query/QueryReplayerTest.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,12 @@ public void SuccessRejectAllQueries() {
147147
testableOfflineManager.MaxQueueSize = -1;
148148
queryReplayer.Lock = false;
149149

150-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
151-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
152-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
153-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
154-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
155-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
150+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
151+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
152+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
153+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
154+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
155+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));
156156

157157
Assert.Equal(6, queryReplayer.Count);
158158

@@ -168,12 +168,12 @@ public void SuccessRejectQueries() {
168168
testableOfflineManager.MaxQueueSize = -1;
169169
queryReplayer.Lock = false;
170170

171-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
172-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
173-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
174-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
175-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
176-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
171+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
172+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
173+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
174+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
175+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
176+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));
177177

178178
Assert.Equal(6, queryReplayer.Count);
179179

Kuzzle/Kuzzle.cs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ internal interface IKuzzle {
6868
public sealed class Kuzzle : IKuzzleApi, IKuzzle {
6969
private AbstractProtocol networkProtocol;
7070

71+
private SemaphoreSlim requestsSemaphore = new SemaphoreSlim(1, 1);
72+
7173
internal readonly Dictionary<string, TaskCompletionSource<Response>>
7274
requests = new Dictionary<string, TaskCompletionSource<Response>>();
7375

@@ -219,7 +221,6 @@ public AbstractProtocol NetworkProtocol {
219221
}
220222
}
221223

222-
223224
/// <summary>
224225
/// Handles the ResponseEvent event from the network protocol
225226
/// </summary>
@@ -228,39 +229,55 @@ public AbstractProtocol NetworkProtocol {
228229
internal void ResponsesListener(object sender, string payload) {
229230
Response response = Response.FromString(payload);
230231

231-
if (requests.ContainsKey(response.Room)) {
232-
if (response.Error != null) {
233-
if (response.Error.Message == "Token expired") {
234-
EventHandler.DispatchTokenExpired();
235-
}
232+
if (!requests.ContainsKey(response.Room)) {
233+
EventHandler.DispatchUnhandledResponse(response);
234+
return;
235+
}
236236

237-
requests[response.RequestId].SetException(
238-
new Exceptions.ApiErrorException(response));
239-
} else {
240-
requests[response.RequestId].SetResult(response);
241-
}
237+
TaskCompletionSource<Response> task = requests[response.RequestId];
242238

243-
lock (requests) {
244-
requests.Remove(response.RequestId);
239+
if (response.Error != null) {
240+
if (response.Error.Message == "Token expired") {
241+
EventHandler.DispatchTokenExpired();
245242
}
246243

247-
Offline?.QueryReplayer?.Remove((obj) => obj["requestId"].ToString() == response.RequestId);
244+
task.SetException(new Exceptions.ApiErrorException(response));
245+
}
246+
else {
247+
task.SetResult(response);
248+
}
248249

249-
} else {
250-
EventHandler.DispatchUnhandledResponse(response);
250+
requestsSemaphore.Wait();
251+
try {
252+
requests.Remove(response.RequestId);
253+
}
254+
finally {
255+
requestsSemaphore.Release();
251256
}
257+
258+
Offline?.QueryReplayer?.Remove(
259+
(obj) => obj["requestId"].ToString() == response.RequestId);
252260
}
253261

254262
internal void StateChangeListener(object sender, ProtocolState state) {
255-
// If not connected anymore: close tasks and clean up the requests buffer
256-
if (state == ProtocolState.Closed) {
257-
lock (requests) {
263+
// If not connected anymore: close pending tasks and clean up the requests
264+
// buffer.
265+
// If reconnecting, only requests submitted AFTER the disconnection event
266+
// can be queued: we have no information about requests submitted before
267+
// that event. For all we know, Kuzzle could have received & processed
268+
// those requests, but couldn't forward the response to us
269+
if (state == ProtocolState.Closed || state == ProtocolState.Reconnecting) {
270+
requestsSemaphore.Wait();
271+
try {
258272
foreach (var task in requests.Values) {
259273
task.SetException(new Exceptions.ConnectionLostException());
260274
}
261275

262276
requests.Clear();
263277
}
278+
finally {
279+
requestsSemaphore.Release();
280+
}
264281
}
265282
}
266283

@@ -376,14 +393,16 @@ public ConfiguredTaskAwaitable<Response> QueryAsync(JObject query) {
376393
query["volatile"]["sdkVersion"] = Version;
377394
query["volatile"]["sdkInstanceId"] = InstanceId;
378395

396+
requestsSemaphore.Wait();
397+
requests[requestId] = new TaskCompletionSource<Response>(
398+
TaskCreationOptions.RunContinuationsAsynchronously);
399+
requestsSemaphore.Release();
400+
379401
if (NetworkProtocol.State == ProtocolState.Open) {
380402
NetworkProtocol.Send(query);
381-
} else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
382-
Offline.QueryReplayer.Enqueue(query);
383403
}
384-
385-
lock (requests) {
386-
requests[requestId] = new TaskCompletionSource<Response>();
404+
else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
405+
Offline.QueryReplayer.Enqueue(query);
387406
}
388407

389408
return requests[requestId].Task.ConfigureAwait(false);

Kuzzle/Kuzzle.csproj

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
<Description>Official C# SDK for Kuzzle</Description>
1212
<RootNamespace>KuzzleSdk</RootNamespace>
1313
<AssemblyVersion>1.0.0.0</AssemblyVersion>
14-
<ReleaseVersion>1.0.0</ReleaseVersion>
1514
<PackageVersion>1.0.0</PackageVersion>
1615
<PackOnBuild>false</PackOnBuild>
1716
</PropertyGroup>
@@ -46,6 +45,10 @@
4645
<MonoDevelop>
4746
<Properties>
4847
<Deployment.LinuxDeployData generatePcFile="False" />
48+
<Policies>
49+
<TextStylePolicy RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" FileWidth="80" TabWidth="2" TabsToSpaces="True" IndentWidth="2" scope="text/x-csharp" />
50+
<CSharpFormattingPolicy IndentBlock="True" IndentBraces="False" IndentSwitchSection="True" IndentSwitchCaseSection="True" LabelPositioning="OneLess" NewLineForElse="True" NewLineForCatch="True" NewLineForFinally="True" NewLineForMembersInObjectInit="True" NewLineForMembersInAnonymousTypes="True" NewLineForClausesInQuery="True" SpaceWithinMethodDeclarationParenthesis="False" SpaceBetweenEmptyMethodDeclarationParentheses="False" SpaceAfterMethodCallName="False" SpaceWithinMethodCallParentheses="False" SpaceBetweenEmptyMethodCallParentheses="False" SpaceAfterControlFlowStatementKeyword="True" SpaceWithinExpressionParentheses="False" SpaceWithinCastParentheses="False" SpaceWithinOtherParentheses="False" SpaceAfterCast="False" SpacesIgnoreAroundVariableDeclaration="False" SpaceBeforeOpenSquareBracket="False" SpaceBetweenEmptySquareBrackets="False" SpaceWithinSquareBrackets="False" SpaceAfterColonInBaseTypeDeclaration="True" SpaceAfterComma="True" SpaceAfterDot="False" SpaceAfterSemicolonsInForStatement="True" SpaceBeforeColonInBaseTypeDeclaration="True" SpaceBeforeComma="False" SpaceBeforeDot="False" SpaceBeforeSemicolonsInForStatement="False" SpacingAroundBinaryOperator="Single" WrappingPreserveSingleLine="True" WrappingKeepStatementsOnSingleLine="True" PlaceSystemDirectiveFirst="True" NewLinesForBracesInTypes="False" NewLinesForBracesInMethods="False" NewLinesForBracesInProperties="False" NewLinesForBracesInAccessors="False" NewLinesForBracesInAnonymousMethods="False" NewLinesForBracesInControlBlocks="False" NewLinesForBracesInAnonymousTypes="False" NewLinesForBracesInObjectCollectionArrayInitializers="False" NewLinesForBracesInLambdaExpressionBody="False" SpacingAfterMethodDeclarationName="True" scope="text/x-csharp" />
51+
</Policies>
4952
</Properties>
5053
</MonoDevelop>
5154
</ProjectExtensions>

Kuzzle/Offline/OfflineManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ internal override void OnUserLoggedIn(object sender, UserLoggedInEvent e) {
226226

227227
internal void StateChangeListener(object sender, ProtocolState state) {
228228
if (state == ProtocolState.Open && previousState == ProtocolState.Reconnecting) {
229-
230229
kuzzle.GetEventHandler().DispatchReconnected();
231230

232231
Task.Run(async () => {
@@ -235,6 +234,7 @@ internal void StateChangeListener(object sender, ProtocolState state) {
235234
});
236235

237236
}
237+
238238
previousState = state;
239239
}
240240

Kuzzle/Offline/Query/QueryReplayer.cs

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Diagnostics;
44
using System.Threading;
55
using System.Threading.Tasks;
6+
using KuzzleSdk.API;
67
using KuzzleSdk.API.Offline;
78
using Newtonsoft.Json.Linq;
89

@@ -73,6 +74,7 @@ internal sealed class QueryReplayer : IQueryReplayer {
7374
private CancellationTokenSource cancellationTokenSource;
7475
private bool currentlyReplaying = false;
7576
private Stopwatch stopWatch = new Stopwatch();
77+
private SemaphoreSlim queueSemaphore = new SemaphoreSlim(1, 1);
7678

7779
/// <summary>
7880
/// Tells if the QueryReplayer is locked (i.e. it doesn't accept new queries).
@@ -103,28 +105,35 @@ internal QueryReplayer(IOfflineManager offlineManager, IKuzzle kuzzle) {
103105
public bool Enqueue(JObject query) {
104106
if (Lock || WaitLoginToReplay) return false;
105107

106-
lock (queue) {
108+
queueSemaphore.Wait();
109+
try {
107110
if (queue.Count < offlineManager.MaxQueueSize || offlineManager.MaxQueueSize < 0) {
108111
if (queue.Count == 0) {
109112
stopWatch.Reset();
110113
stopWatch.Start();
111114
queue.Add(new TimedQuery(query, 0));
112-
} else {
115+
}
116+
else {
113117
TimedQuery previous = queue[queue.Count - 1];
114118
Int64 elapsedTime = stopWatch.ElapsedMilliseconds - previous.Time;
115119
elapsedTime = Math.Min(elapsedTime, offlineManager.MaxRequestDelay);
116120
queue.Add(new TimedQuery(query, previous.Time + elapsedTime));
117121
}
118-
if (query["controller"]?.ToString() == "auth"
119-
&& (query["action"]?.ToString() == "login"
120-
|| query["action"]?.ToString() == "logout")
121-
) {
122-
Lock = true;
123-
}
122+
123+
String controller = query["controller"]?.ToString();
124+
String action = query["action"]?.ToString();
125+
126+
if (controller == "auth" && (action == "login" || action == "logout")) {
127+
Lock = true;
128+
}
124129

125130
return true;
126131
}
127132
}
133+
finally {
134+
queueSemaphore.Release();
135+
}
136+
128137
return false;
129138
}
130139

@@ -139,7 +148,8 @@ public int Count {
139148
/// Remove and return the first query that has been added to the queue.
140149
/// </summary>
141150
public JObject Dequeue() {
142-
lock (queue) {
151+
queueSemaphore.Wait();
152+
try {
143153
if (queue.Count == 0) {
144154
return null;
145155
}
@@ -149,6 +159,9 @@ public JObject Dequeue() {
149159

150160
return query;
151161
}
162+
finally {
163+
queueSemaphore.Release();
164+
}
152165
}
153166

154167
/// <summary>
@@ -163,27 +176,42 @@ public void RejectAllQueries(Exception exception) {
163176
/// it is set with an exception and removed from the replayable queue.
164177
/// </summary>
165178
public void RejectQueries(Predicate<JObject> predicate, Exception exception) {
166-
lock (queue) {
179+
queueSemaphore.Wait();
180+
try {
167181
foreach (TimedQuery timedQuery in queue) {
168182
if (predicate(timedQuery.Query)) {
169-
kuzzle.GetRequestById(timedQuery.Query["requestId"]?.ToString())?.SetException(exception);
183+
String requestId = timedQuery.Query["requestId"]?.ToString();
184+
185+
if (requestId != null) {
186+
TaskCompletionSource<Response> task = kuzzle.GetRequestById(requestId);
187+
188+
if (task != null) {
189+
task.SetException(exception);
190+
}
191+
}
170192
}
171193
}
194+
172195
queue.RemoveAll((obj) => predicate(obj.Query));
196+
173197
if (queue.Count == 0) {
174198
Lock = false;
175199
currentlyReplaying = false;
176200
WaitLoginToReplay = false;
177201
}
178202
}
203+
finally {
204+
queueSemaphore.Release();
205+
}
179206
}
180207

181208
/// <summary>
182209
/// Remove every query that satisfies the predicate
183210
/// </summary>
184211
/// <returns>How many items where removed.</returns>
185212
public int Remove(Predicate<JObject> predicate) {
186-
lock (queue) {
213+
queueSemaphore.Wait();
214+
try {
187215
if (queue.Count > 0) {
188216
Predicate<TimedQuery> timedQueryPredicate = timedQuery => predicate(timedQuery.Query);
189217
int itemsRemoved = queue.RemoveAll(timedQueryPredicate);
@@ -196,19 +224,22 @@ public int Remove(Predicate<JObject> predicate) {
196224
return itemsRemoved;
197225
}
198226
}
227+
finally {
228+
queueSemaphore.Release();
229+
}
199230
return 0;
200231
}
201232

202233
/// <summary>
203234
/// Clear the queue.
204235
/// </summary>
205236
public void Clear() {
206-
lock (queue) {
207-
queue.Clear();
208-
Lock = false;
209-
currentlyReplaying = false;
210-
WaitLoginToReplay = false;
211-
}
237+
queueSemaphore.Wait();
238+
queue.Clear();
239+
Lock = false;
240+
currentlyReplaying = false;
241+
WaitLoginToReplay = false;
242+
queueSemaphore.Release();
212243
}
213244

214245
internal delegate Task ReplayQueryFunc(TimedQuery timedQuery, CancellationToken cancellationToken);
@@ -255,7 +286,8 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool
255286

256287
if (resetWaitLogin) WaitLoginToReplay = false;
257288

258-
lock (queue) {
289+
queueSemaphore.Wait();
290+
try {
259291
if (queue.Count > 0) {
260292
currentlyReplaying = true;
261293

@@ -267,6 +299,9 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool
267299

268300
}
269301
}
302+
finally {
303+
queueSemaphore.Release();
304+
}
270305
return cancellationTokenSource;
271306
}
272307

0 commit comments

Comments
 (0)