From 8c7d9a551ca50a4102fae0d9c14805e0ae86bc4f Mon Sep 17 00:00:00 2001 From: f Date: Wed, 23 Apr 2025 20:19:32 +0700 Subject: [PATCH 1/9] perf:last node run in current thread directly --- .../clients/jedis/MultiNodePipelineBase.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 247069410a..aa3bd62d25 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,17 +86,20 @@ public final void sync() { } syncing = true; - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + boolean onlyOneNode = pipelinedResponses.size() == 1; + ExecutorService executorService = onlyOneNode ? null : Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + CountDownLatch countDownLatch = onlyOneNode ? null : new CountDownLatch(pipelinedResponses.size()); - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); - while (pipelinedResponsesIterator.hasNext()) { - Map.Entry>> entry = pipelinedResponsesIterator.next(); + Iterator>>> iter = pipelinedResponses.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry>> entry = iter.next(); HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); - executorService.submit(() -> { + + // last node run in current thread directly + Executor executor = iter.hasNext() ? executorService : Runnable::run; + executor.execute(() -> { try { List unformatted = connection.getMany(queue.size()); for (Object o : unformatted) { @@ -104,22 +108,26 @@ public final void sync() { } catch (JedisConnectionException jce) { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection - pipelinedResponsesIterator.remove(); + iter.remove(); connections.remove(nodeKey); IOUtils.closeQuietly(connection); } finally { - countDownLatch.countDown(); + if (!onlyOneNode) { + countDownLatch.countDown(); + } } }); } - try { - countDownLatch.await(); - } catch (InterruptedException e) { - log.error("Thread is interrupted during sync.", e); - } + if (!onlyOneNode) { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + log.error("Thread is interrupted during sync.", e); + } - executorService.shutdownNow(); + executorService.shutdownNow(); + } syncing = false; } From 0049d4ba6da401e8be1799bb3e1d56723998efd5 Mon Sep 17 00:00:00 2001 From: f Date: Wed, 23 Apr 2025 20:26:08 +0700 Subject: [PATCH 2/9] fix: connection leak, we should return it to connection pool --- .../java/redis/clients/jedis/MultiNodePipelineBase.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index aa3bd62d25..b4da89d199 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -108,13 +108,14 @@ public final void sync() { } catch (JedisConnectionException jce) { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection + // TODO these operations not thread-safe iter.remove(); connections.remove(nodeKey); - IOUtils.closeQuietly(connection); } finally { - if (!onlyOneNode) { - countDownLatch.countDown(); - } + IOUtils.closeQuietly(connection); + if (!onlyOneNode) { + countDownLatch.countDown(); + } } }); } From 88532c3a52b4f1dee9697619a880175ab5b10527 Mon Sep 17 00:00:00 2001 From: f Date: Wed, 23 Apr 2025 20:29:58 +0700 Subject: [PATCH 3/9] noop sync when pipelinedResponses.isEmpty() --- src/main/java/redis/clients/jedis/MultiNodePipelineBase.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index b4da89d199..b4f86e28fb 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -81,6 +81,10 @@ public void close() { @Override public final void sync() { + if (pipelinedResponses.isEmpty()) { + return; + } + if (syncing) { return; } From 4d111b1d375e1843c55fd48267707b6d1c1da52d Mon Sep 17 00:00:00 2001 From: f Date: Wed, 23 Apr 2025 20:44:59 +0700 Subject: [PATCH 4/9] revert rename --- .../redis/clients/jedis/MultiNodePipelineBase.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index b4f86e28fb..8abaa6ef2f 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -94,15 +94,16 @@ public final void sync() { ExecutorService executorService = onlyOneNode ? null : Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); CountDownLatch countDownLatch = onlyOneNode ? null : new CountDownLatch(pipelinedResponses.size()); - Iterator>>> iter = pipelinedResponses.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry>> entry = iter.next(); + Iterator>>> pipelinedResponsesIterator + = pipelinedResponses.entrySet().iterator(); + while (pipelinedResponsesIterator.hasNext()) { + Map.Entry>> entry = pipelinedResponsesIterator.next(); HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); // last node run in current thread directly - Executor executor = iter.hasNext() ? executorService : Runnable::run; + Executor executor = pipelinedResponsesIterator.hasNext() ? executorService : Runnable::run; executor.execute(() -> { try { List unformatted = connection.getMany(queue.size()); @@ -113,7 +114,7 @@ public final void sync() { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection // TODO these operations not thread-safe - iter.remove(); + pipelinedResponsesIterator.remove(); connections.remove(nodeKey); } finally { IOUtils.closeQuietly(connection); From 9e93233f9794b208e0a4dfe268c8138027245710 Mon Sep 17 00:00:00 2001 From: f Date: Thu, 24 Apr 2025 00:03:26 +0700 Subject: [PATCH 5/9] clean --- .../clients/jedis/MultiNodePipelineBase.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 8abaa6ef2f..e8f7374975 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -81,18 +81,14 @@ public void close() { @Override public final void sync() { - if (pipelinedResponses.isEmpty()) { - return; - } - if (syncing) { return; } syncing = true; - boolean onlyOneNode = pipelinedResponses.size() == 1; - ExecutorService executorService = onlyOneNode ? null : Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - CountDownLatch countDownLatch = onlyOneNode ? null : new CountDownLatch(pipelinedResponses.size()); + boolean multiNode = pipelinedResponses.size() > 1; + ExecutorService executorService = multiNode ? Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS) : null; + CountDownLatch countDownLatch = multiNode ? new CountDownLatch(pipelinedResponses.size()) : null; Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -113,19 +109,19 @@ public final void sync() { } catch (JedisConnectionException jce) { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection - // TODO these operations not thread-safe + // TODO these operations not thread-safe and pipelinedResponsesIterator.remove(); connections.remove(nodeKey); - } finally { IOUtils.closeQuietly(connection); - if (!onlyOneNode) { + } finally { + if (multiNode) { countDownLatch.countDown(); } } }); } - if (!onlyOneNode) { + if (multiNode) { try { countDownLatch.await(); } catch (InterruptedException e) { From 306ab36fbbd1ca794a92256c05692dd3b4555a42 Mon Sep 17 00:00:00 2001 From: f Date: Thu, 24 Apr 2025 13:06:32 +0700 Subject: [PATCH 6/9] remove last node run in current thread when multi node --- .../redis/clients/jedis/MultiNodePipelineBase.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index e8f7374975..9e9709bcd7 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -87,7 +87,14 @@ public final void sync() { syncing = true; boolean multiNode = pipelinedResponses.size() > 1; - ExecutorService executorService = multiNode ? Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS) : null; + Executor executor; + ExecutorService executorService = null; + if (multiNode) { + executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + executor = executorService; + } else { + executor = Runnable::run; + } CountDownLatch countDownLatch = multiNode ? new CountDownLatch(pipelinedResponses.size()) : null; Iterator>>> pipelinedResponsesIterator @@ -97,9 +104,6 @@ public final void sync() { HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); - - // last node run in current thread directly - Executor executor = pipelinedResponsesIterator.hasNext() ? executorService : Runnable::run; executor.execute(() -> { try { List unformatted = connection.getMany(queue.size()); @@ -109,7 +113,7 @@ public final void sync() { } catch (JedisConnectionException jce) { log.error("Error with connection to " + nodeKey, jce); // cleanup the connection - // TODO these operations not thread-safe and + // TODO these operations not thread-safe and when executed here, the iter may moved pipelinedResponsesIterator.remove(); connections.remove(nodeKey); IOUtils.closeQuietly(connection); From 317d123171d7408fbb8c6a44dd6be22d8026c8cd Mon Sep 17 00:00:00 2001 From: f Date: Tue, 29 Apr 2025 22:26:05 +0700 Subject: [PATCH 7/9] add test for pipeline all keys at same node --- .../clients/jedis/ClusterPipeliningTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 0c6667d2f9..dba1745656 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1112,6 +1112,48 @@ public void multiple() { } } + @Test + public void testPipelineKeysAtSameNode() { + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) { + + // test simple key + cluster.set("foo", "bar"); + + try (ClusterPipeline pipeline = cluster.pipelined()) { + Response foo = pipeline.get("foo"); + pipeline.sync(); + + assertEquals("bar", foo.get()); + } + + // test multi key but at same node + int cnt = 3; + String prefix = "{foobar}:"; + for (int i = 0; i < cnt; i++) { + String key = prefix + i; + cluster.set(key, String.valueOf(i)); + } + + try (ClusterPipeline pipeline = cluster.pipelined()) { + List> results = new ArrayList<>(); + for (int i = 0; i < cnt; i++) { + String key = prefix + i; + results.add(pipeline.get(key)); + } + + Response foo = pipeline.eval("return redis.call('get', KEYS[1])", Collections.singletonList("foo"), Collections.emptyList()); + + pipeline.sync(); + int idx = 0; + for (Response res : results) { + assertEquals(String.valueOf(idx), res.get()); + idx++; + } + assertEquals("bar", String.valueOf(foo.get())); + } + } + } + private static void assertThreadsCount() { // Get the root thread group final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent(); From 7f6d318d34b78b1012667dac908d140975d0aea2 Mon Sep 17 00:00:00 2001 From: f Date: Wed, 30 Apr 2025 17:36:08 +0700 Subject: [PATCH 8/9] fix: make all keys on same node --- src/test/java/redis/clients/jedis/ClusterPipeliningTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index dba1745656..54f1924426 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1128,7 +1128,7 @@ public void testPipelineKeysAtSameNode() { // test multi key but at same node int cnt = 3; - String prefix = "{foobar}:"; + String prefix = "{foo}:"; for (int i = 0; i < cnt; i++) { String key = prefix + i; cluster.set(key, String.valueOf(i)); From 5899069cbc48664ffe738af8c602a551ac5f3071 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 7 May 2025 13:17:51 +0300 Subject: [PATCH 9/9] formatting --- .../java/redis/clients/jedis/MultiNodePipelineBase.java | 8 +++++--- .../java/redis/clients/jedis/ClusterPipeliningTest.java | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 9e9709bcd7..05bcbaaced 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -95,10 +95,12 @@ public final void sync() { } else { executor = Runnable::run; } - CountDownLatch countDownLatch = multiNode ? new CountDownLatch(pipelinedResponses.size()) : null; + CountDownLatch countDownLatch = multiNode + ? new CountDownLatch(pipelinedResponses.size()) + : null; - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); + Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet() + .iterator(); while (pipelinedResponsesIterator.hasNext()) { Map.Entry>> entry = pipelinedResponsesIterator.next(); HostAndPort nodeKey = entry.getKey(); diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 54f1924426..f4d6a9bf07 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1141,7 +1141,8 @@ public void testPipelineKeysAtSameNode() { results.add(pipeline.get(key)); } - Response foo = pipeline.eval("return redis.call('get', KEYS[1])", Collections.singletonList("foo"), Collections.emptyList()); + Response foo = pipeline.eval("return redis.call('get', KEYS[1])", + Collections.singletonList("foo"), Collections.emptyList()); pipeline.sync(); int idx = 0;