Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Kuzzle.Tests/Offline/Query/QueryReplayerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ public void SuccessRejectAllQueries() {
testableOfflineManager.MaxQueueSize = -1;
queryReplayer.Lock = false;

Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));

Assert.Equal(6, queryReplayer.Count);

Expand All @@ -168,12 +168,12 @@ public void SuccessRejectQueries() {
testableOfflineManager.MaxQueueSize = -1;
queryReplayer.Lock = false;

Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));

Assert.Equal(6, queryReplayer.Count);

Expand Down
67 changes: 43 additions & 24 deletions Kuzzle/Kuzzle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ internal interface IKuzzle {
public sealed class Kuzzle : IKuzzleApi, IKuzzle {
private AbstractProtocol networkProtocol;

private SemaphoreSlim requestsSemaphore = new SemaphoreSlim(1, 1);

internal readonly Dictionary<string, TaskCompletionSource<Response>>
requests = new Dictionary<string, TaskCompletionSource<Response>>();

Expand Down Expand Up @@ -220,7 +222,6 @@ public AbstractProtocol NetworkProtocol {
}
}


/// <summary>
/// Handles the ResponseEvent event from the network protocol
/// </summary>
Expand All @@ -229,39 +230,55 @@ public AbstractProtocol NetworkProtocol {
internal void ResponsesListener(object sender, string payload) {
Response response = Response.FromString(payload);

if (requests.ContainsKey(response.Room)) {
if (response.Error != null) {
if (response.Error.Message == "Token expired") {
EventHandler.DispatchTokenExpired();
}
if (!requests.ContainsKey(response.Room)) {
EventHandler.DispatchUnhandledResponse(response);
return;
}

requests[response.RequestId].SetException(
new Exceptions.ApiErrorException(response));
} else {
requests[response.RequestId].SetResult(response);
}
TaskCompletionSource<Response> task = requests[response.RequestId];

lock (requests) {
requests.Remove(response.RequestId);
if (response.Error != null) {
if (response.Error.Message == "Token expired") {
EventHandler.DispatchTokenExpired();
}

Offline?.QueryReplayer?.Remove((obj) => obj["requestId"].ToString() == response.RequestId);
task.SetException(new Exceptions.ApiErrorException(response));
}
else {
task.SetResult(response);
}

} else {
EventHandler.DispatchUnhandledResponse(response);
requestsSemaphore.Wait();
try {
requests.Remove(response.RequestId);
}
finally {
requestsSemaphore.Release();
}

Offline?.QueryReplayer?.Remove(
(obj) => obj["requestId"].ToString() == response.RequestId);
}

internal void StateChangeListener(object sender, ProtocolState state) {
// If not connected anymore: close tasks and clean up the requests buffer
if (state == ProtocolState.Closed) {
lock (requests) {
// If not connected anymore: close pending tasks and clean up the requests
// buffer.
// If reconnecting, only requests submitted AFTER the disconnection event
// can be queued: we have no information about requests submitted before
// that event. For all we know, Kuzzle could have received & processed
// those requests, but couldn't forward the response to us
if (state == ProtocolState.Closed || state == ProtocolState.Reconnecting) {
requestsSemaphore.Wait();
try {
foreach (var task in requests.Values) {
task.SetException(new Exceptions.ConnectionLostException());
}

requests.Clear();
}
finally {
requestsSemaphore.Release();
}
}
}

Expand Down Expand Up @@ -382,14 +399,16 @@ public ConfiguredTaskAwaitable<Response> QueryAsync(JObject query) {
query["volatile"]["sdkName"] = SdkName;
query["volatile"]["sdkInstanceId"] = InstanceId;

requestsSemaphore.Wait();
requests[requestId] = new TaskCompletionSource<Response>(
TaskCreationOptions.RunContinuationsAsynchronously);
requestsSemaphore.Release();

if (NetworkProtocol.State == ProtocolState.Open) {
NetworkProtocol.Send(query);
} else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
Offline.QueryReplayer.Enqueue(query);
}

lock (requests) {
requests[requestId] = new TaskCompletionSource<Response>();
else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
Offline.QueryReplayer.Enqueue(query);
}

return requests[requestId].Task.ConfigureAwait(false);
Expand Down
4 changes: 4 additions & 0 deletions Kuzzle/Kuzzle.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<MonoDevelop>
<Properties>
<Deployment.LinuxDeployData generatePcFile="False" />
<Policies>
<TextStylePolicy RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" FileWidth="80" TabWidth="2" TabsToSpaces="True" IndentWidth="2" scope="text/x-csharp" />
<CSharpFormattingPolicy IndentBlock="True" IndentBraces="False" IndentSwitchSection="True" IndentSwitchCaseSection="True" LabelPositioning="OneLess" NewLineForElse="True" NewLineForCatch="True" NewLineForFinally="True" NewLineForMembersInObjectInit="True" NewLineForMembersInAnonymousTypes="True" NewLineForClausesInQuery="True" SpaceWithinMethodDeclarationParenthesis="False" SpaceBetweenEmptyMethodDeclarationParentheses="False" SpaceAfterMethodCallName="False" SpaceWithinMethodCallParentheses="False" SpaceBetweenEmptyMethodCallParentheses="False" SpaceAfterControlFlowStatementKeyword="True" SpaceWithinExpressionParentheses="False" SpaceWithinCastParentheses="False" SpaceWithinOtherParentheses="False" SpaceAfterCast="False" SpacesIgnoreAroundVariableDeclaration="False" SpaceBeforeOpenSquareBracket="False" SpaceBetweenEmptySquareBrackets="False" SpaceWithinSquareBrackets="False" SpaceAfterColonInBaseTypeDeclaration="True" SpaceAfterComma="True" SpaceAfterDot="False" SpaceAfterSemicolonsInForStatement="True" SpaceBeforeColonInBaseTypeDeclaration="True" SpaceBeforeComma="False" SpaceBeforeDot="False" SpaceBeforeSemicolonsInForStatement="False" SpacingAroundBinaryOperator="Single" WrappingPreserveSingleLine="True" WrappingKeepStatementsOnSingleLine="True" PlaceSystemDirectiveFirst="True" NewLinesForBracesInTypes="False" NewLinesForBracesInMethods="False" NewLinesForBracesInProperties="False" NewLinesForBracesInAccessors="False" NewLinesForBracesInAnonymousMethods="False" NewLinesForBracesInControlBlocks="False" NewLinesForBracesInAnonymousTypes="False" NewLinesForBracesInObjectCollectionArrayInitializers="False" NewLinesForBracesInLambdaExpressionBody="False" SpacingAfterMethodDeclarationName="True" scope="text/x-csharp" />
</Policies>
</Properties>
</MonoDevelop>
</ProjectExtensions>
Expand Down
2 changes: 1 addition & 1 deletion Kuzzle/Offline/OfflineManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ internal override void OnUserLoggedIn(object sender, UserLoggedInEvent e) {

internal void StateChangeListener(object sender, ProtocolState state) {
if (state == ProtocolState.Open && previousState == ProtocolState.Reconnecting) {

kuzzle.GetEventHandler().DispatchReconnected();

Task.Run(async () => {
Expand All @@ -235,6 +234,7 @@ internal void StateChangeListener(object sender, ProtocolState state) {
});

}

previousState = state;
}

Expand Down
73 changes: 54 additions & 19 deletions Kuzzle/Offline/Query/QueryReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using KuzzleSdk.API;
using KuzzleSdk.API.Offline;
using Newtonsoft.Json.Linq;

Expand Down Expand Up @@ -73,6 +74,7 @@ internal sealed class QueryReplayer : IQueryReplayer {
private CancellationTokenSource cancellationTokenSource;
private bool currentlyReplaying = false;
private Stopwatch stopWatch = new Stopwatch();
private SemaphoreSlim queueSemaphore = new SemaphoreSlim(1, 1);

/// <summary>
/// Tells if the QueryReplayer is locked (i.e. it doesn't accept new queries).
Expand Down Expand Up @@ -103,28 +105,35 @@ internal QueryReplayer(IOfflineManager offlineManager, IKuzzle kuzzle) {
public bool Enqueue(JObject query) {
if (Lock || WaitLoginToReplay) return false;

lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count < offlineManager.MaxQueueSize || offlineManager.MaxQueueSize < 0) {
if (queue.Count == 0) {
stopWatch.Reset();
stopWatch.Start();
queue.Add(new TimedQuery(query, 0));
} else {
}
else {
TimedQuery previous = queue[queue.Count - 1];
Int64 elapsedTime = stopWatch.ElapsedMilliseconds - previous.Time;
elapsedTime = Math.Min(elapsedTime, offlineManager.MaxRequestDelay);
queue.Add(new TimedQuery(query, previous.Time + elapsedTime));
}
if (query["controller"]?.ToString() == "auth"
&& (query["action"]?.ToString() == "login"
|| query["action"]?.ToString() == "logout")
) {
Lock = true;
}

String controller = query["controller"]?.ToString();
String action = query["action"]?.ToString();

if (controller == "auth" && (action == "login" || action == "logout")) {
Lock = true;
}

return true;
}
}
finally {
queueSemaphore.Release();
}

return false;
}

Expand All @@ -139,7 +148,8 @@ public int Count {
/// Remove and return the first query that has been added to the queue.
/// </summary>
public JObject Dequeue() {
lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count == 0) {
return null;
}
Expand All @@ -149,6 +159,9 @@ public JObject Dequeue() {

return query;
}
finally {
queueSemaphore.Release();
}
}

/// <summary>
Expand All @@ -163,27 +176,42 @@ public void RejectAllQueries(Exception exception) {
/// it is set with an exception and removed from the replayable queue.
/// </summary>
public void RejectQueries(Predicate<JObject> predicate, Exception exception) {
lock (queue) {
queueSemaphore.Wait();
try {
foreach (TimedQuery timedQuery in queue) {
if (predicate(timedQuery.Query)) {
kuzzle.GetRequestById(timedQuery.Query["requestId"]?.ToString())?.SetException(exception);
String requestId = timedQuery.Query["requestId"]?.ToString();

if (requestId != null) {
TaskCompletionSource<Response> task = kuzzle.GetRequestById(requestId);

if (task != null) {
task.SetException(exception);
}
}
}
}

queue.RemoveAll((obj) => predicate(obj.Query));

if (queue.Count == 0) {
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
}
}
finally {
queueSemaphore.Release();
}
}

/// <summary>
/// Remove every query that satisfies the predicate
/// </summary>
/// <returns>How many items where removed.</returns>
public int Remove(Predicate<JObject> predicate) {
lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count > 0) {
Predicate<TimedQuery> timedQueryPredicate = timedQuery => predicate(timedQuery.Query);
int itemsRemoved = queue.RemoveAll(timedQueryPredicate);
Expand All @@ -196,19 +224,22 @@ public int Remove(Predicate<JObject> predicate) {
return itemsRemoved;
}
}
finally {
queueSemaphore.Release();
}
return 0;
}

/// <summary>
/// Clear the queue.
/// </summary>
public void Clear() {
lock (queue) {
queue.Clear();
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
}
queueSemaphore.Wait();
queue.Clear();
Lock = false;
currentlyReplaying = false;
WaitLoginToReplay = false;
queueSemaphore.Release();
}

internal delegate Task ReplayQueryFunc(TimedQuery timedQuery, CancellationToken cancellationToken);
Expand Down Expand Up @@ -255,7 +286,8 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool

if (resetWaitLogin) WaitLoginToReplay = false;

lock (queue) {
queueSemaphore.Wait();
try {
if (queue.Count > 0) {
currentlyReplaying = true;

Expand All @@ -267,6 +299,9 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool

}
}
finally {
queueSemaphore.Release();
}
return cancellationTokenSource;
}

Expand Down
Loading