diff --git a/src/ZeroMQ.AcceptanceTests/SocketFixtures.cs b/src/ZeroMQ.AcceptanceTests/SocketFixtures.cs index 3a34aeb..9e3fd2f 100644 --- a/src/ZeroMQ.AcceptanceTests/SocketFixtures.cs +++ b/src/ZeroMQ.AcceptanceTests/SocketFixtures.cs @@ -117,8 +117,8 @@ public void Initialize() _senderThread = new Thread(() => { - SenderInit(Sender); Sender.SendHighWatermark = 1; + SenderInit(Sender); _receiverReady.WaitOne(); Sender.Connect("inproc://spec_context"); SenderAction(Sender); @@ -126,8 +126,8 @@ public void Initialize() _receiverThread = new Thread(() => { - ReceiverInit(Receiver); Receiver.SendHighWatermark = 1; + ReceiverInit(Receiver); Receiver.Bind("inproc://spec_context"); _receiverReady.Set(); ReceiverAction(Receiver); @@ -170,4 +170,9 @@ public class UsingThreadedPubSub : UsingThreadedSocketPair { public UsingThreadedPubSub() : base(SocketType.PUB, SocketType.SUB) { } } + + public class UsingThreadedPushPull : UsingThreadedSocketPair + { + public UsingThreadedPushPull() : base(SocketType.PUSH, SocketType.PULL) { } + } } diff --git a/src/ZeroMQ.AcceptanceTests/ZmqSocketTests/Send_Receive.cs b/src/ZeroMQ.AcceptanceTests/ZmqSocketTests/Send_Receive.cs index 23ef808..49aecd2 100644 --- a/src/ZeroMQ.AcceptanceTests/ZmqSocketTests/Send_Receive.cs +++ b/src/ZeroMQ.AcceptanceTests/ZmqSocketTests/Send_Receive.cs @@ -203,5 +203,84 @@ public WhenTransferringWithAPreallocatedReceiveBuffer() }; } } + + public class WhenTransferringWithAnAmpleSendTimeout : UsingThreadedPushPull + { + private const int SendMessageCount = 5; + private int _receivedMessageCount; + + public WhenTransferringWithAnAmpleSendTimeout() + { + SenderInit = push => push.SendHighWatermark = 1; + SenderAction = push => + { + for (int i = 0; i < SendMessageCount; i++) + { + push.SendFrame(Messages.SingleMessage, TimeSpan.FromMilliseconds(500)); + } + }; + + // slow receiver with small HighWatermark + ReceiverInit = pull => pull.ReceiveHighWatermark = 1; + ReceiverAction = pull => + { + for (int i = 0; i < SendMessageCount; i++) + { + Thread.Sleep(50); + + int size; + pull.Receive(null, SocketFlags.DontWait, out size); + if (pull.ReceiveStatus == ReceiveStatus.Received) + _receivedMessageCount++; + } + }; + } + + [Test] + public void ShouldReceiveAllMessages() + { + Assert.AreEqual(SendMessageCount, _receivedMessageCount); + } + } + + public class WhenTransferringWithAnInsufficientSendTimeout : UsingThreadedPushPull + { + private const int SendMessageCount = 5; + private const int SendHighWatermark = 1; + private const int ReceiveHighWatermark = 1; + private int _receivedMessageCount; + + public WhenTransferringWithAnInsufficientSendTimeout() + { + SenderInit = push => push.SendHighWatermark = SendHighWatermark; + SenderAction = push => + { + for (int i = 0; i < SendMessageCount; i++) + { + push.SendFrame(Messages.SingleMessage, TimeSpan.FromMilliseconds(5)); + } + }; + + // slow receiver with small HighWatermark + ReceiverInit = pull => pull.ReceiveHighWatermark = ReceiveHighWatermark; + ReceiverAction = pull => + { + Thread.Sleep(50); + + for (int i = 0; i < SendMessageCount; i++) + { + var frame = pull.ReceiveFrame(TimeSpan.FromMilliseconds(10)); + if (frame.ReceiveStatus == ReceiveStatus.Received) + _receivedMessageCount++; + } + }; + } + + [Test] + public void ShouldReceiveHighWatermarkMessages() + { + Assert.AreEqual(SendHighWatermark + ReceiveHighWatermark, _receivedMessageCount); + } + } } } diff --git a/src/ZeroMQ/ExecuteExtensions.cs b/src/ZeroMQ/ExecuteExtensions.cs index 8f42c92..3f7761c 100644 --- a/src/ZeroMQ/ExecuteExtensions.cs +++ b/src/ZeroMQ/ExecuteExtensions.cs @@ -9,7 +9,7 @@ internal static class ExecuteExtensions { public delegate TResult ThirdParamOut(T1 arg1, T2 arg2, out T3 arg3); - public static TResult WithTimeout(this ZmqSocket socket, Func method, T1 arg1, T2 arg2, TimeSpan timeout) + public static TResult WithReceiveTimeout(this ZmqSocket socket, Func method, T1 arg1, T2 arg2, TimeSpan timeout) { if ((int)timeout.TotalMilliseconds < 1) { @@ -37,7 +37,7 @@ public static TResult WithTimeout(this ZmqSocket socket, Func(this ZmqSocket socket, Func method, T1 arg1, T2 arg2, T3 arg3, TimeSpan timeout) + public static TResult WithSendTimeout(this ZmqSocket socket, Func method, T1 arg1, T2 arg2, T3 arg3, TimeSpan timeout) { if ((int)timeout.TotalMilliseconds < 1) { @@ -53,7 +53,7 @@ public static TResult WithTimeout(this ZmqSocket socket, Fu { receiveResult = method(arg1, arg2, arg3); - if (socket.ReceiveStatus != ReceiveStatus.TryAgain) + if (socket.SendStatus != SendStatus.TryAgain) { break; } @@ -65,7 +65,7 @@ public static TResult WithTimeout(this ZmqSocket socket, Fu return receiveResult; } - public static TResult WithTimeout(this ZmqSocket socket, ThirdParamOut method, T1 arg1, T2 arg2, out T3 arg3, TimeSpan timeout) + public static TResult WithReceiveTimeout(this ZmqSocket socket, ThirdParamOut method, T1 arg1, T2 arg2, out T3 arg3, TimeSpan timeout) { if ((int)timeout.TotalMilliseconds < 1) { diff --git a/src/ZeroMQ/ZmqSocket.cs b/src/ZeroMQ/ZmqSocket.cs index d846b92..b0c9219 100644 --- a/src/ZeroMQ/ZmqSocket.cs +++ b/src/ZeroMQ/ZmqSocket.cs @@ -550,7 +550,7 @@ public int Receive(byte[] buffer, TimeSpan timeout) { return timeout == TimeSpan.MaxValue ? Receive(buffer) - : this.WithTimeout(Receive, buffer, SocketFlags.DontWait, timeout); + : this.WithReceiveTimeout(Receive, buffer, SocketFlags.DontWait, timeout); } /// @@ -648,7 +648,7 @@ public byte[] Receive(byte[] buffer, TimeSpan timeout, out int size) } int receivedBytes; - byte[] message = this.WithTimeout(Receive, buffer, SocketFlags.DontWait, out receivedBytes, timeout); + byte[] message = this.WithReceiveTimeout(Receive, buffer, SocketFlags.DontWait, out receivedBytes, timeout); size = receivedBytes; @@ -782,7 +782,7 @@ public int Send(byte[] buffer, int size, SocketFlags flags, TimeSpan timeout) { return timeout == TimeSpan.MaxValue ? Send(buffer, size, flags & ~SocketFlags.DontWait) - : this.WithTimeout(Send, buffer, size, flags | SocketFlags.DontWait, timeout); + : this.WithSendTimeout(Send, buffer, size, flags | SocketFlags.DontWait, timeout); } ///