Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@
import java.util.function.Function;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.MetricType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;

/**
* ParallelPhoenixResultSet that provides the standard wait until at least one cluster completes
* approach
* ParallelPhoenixResultSet that provides the standard wait until at least one cluster completes. We
* close the idle result set to release server resources asynchronously.
*/
public class ParallelPhoenixResultSet extends DelegateResultSet
implements PhoenixMonitoredResultSet {

private static final Logger LOGGER = LoggerFactory.getLogger(ParallelPhoenixResultSet.class);

private final ParallelPhoenixContext context;
private final CompletableFuture<ResultSet> rs1, rs2;

Expand Down Expand Up @@ -80,9 +84,11 @@ public boolean next() throws SQLException {
try {
if (next1.isDone() && !next1.isCompletedExceptionally()) {
rs = rs1.get();
closeIdleResultSet(rs2);
return next1.get();
} else { // (next2.isDone() && !next2.isCompletedExceptionally())
rs = rs2.get();
closeIdleResultSet(rs1);
return next2.get();
}
} catch (Exception e) {
Expand Down Expand Up @@ -147,6 +153,22 @@ public void resetMetrics() {
context.resetMetrics();
}

/**
* Closes the idle result set to release server resources immediately. This is called after we've
* bound to the winning result set.
*/
private void closeIdleResultSet(CompletableFuture<ResultSet> idleFuture) {
idleFuture.whenComplete((resultSet, throwable) -> {
if (throwable == null && resultSet != null) {
try {
resultSet.close();
} catch (Exception e) {
LOGGER.warn("Failed to close idle result set: {}", e.getMessage(), e);
}
}
});
}

@SuppressWarnings("unchecked")
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;

Expand Down Expand Up @@ -75,6 +76,13 @@ public void testRS1WinsNext() throws Exception {
Executor rsExecutor2 = Mockito.mock(Executor.class);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);

// Set up rs2 to notify when close() is called for async verification
doAnswer(invocation -> {
closeLatch.countDown();
return null;
}).when(rs2).close();

// inject a sleep
doAnswer((InvocationOnMock invocation) -> {
Expand Down Expand Up @@ -103,6 +111,25 @@ public void testRS1WinsNext() throws Exception {
resultSet.next();

assertEquals(rs1, resultSet.getResultSet());

// rs2 is not done yet, so it should NOT be closed immediately
Mockito.verify(rs2, Mockito.never()).close();

// Now complete rs2 and verify it gets closed asynchronously
latch.countDown();

// Wait for async close to happen (with timeout)
boolean closeHappened = closeLatch.await(2, TimeUnit.SECONDS);
assertTrue(closeHappened);

// Explicitly verify rs2 (idle) was closed
Mockito.verify(rs2).close();

// Verify rs1 (winner) was not closed
Mockito.verify(rs1, Mockito.never()).close();

rs1.close();
Mockito.verify(rs1).close();
}

@Test
Expand All @@ -112,6 +139,14 @@ public void testRS2WinsNext() throws Exception {

Executor rsExecutor1 = Mockito.mock(Executor.class);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);

// Set up rs1 to notify when close() is called for async verification
doAnswer(invocation -> {
closeLatch.countDown();
return null;
}).when(rs1).close();

// inject a sleep
doAnswer((InvocationOnMock invocation) -> {
Thread thread = new Thread(() -> {
Expand All @@ -137,6 +172,25 @@ public void testRS2WinsNext() throws Exception {
resultSet.next();

assertEquals(rs2, resultSet.getResultSet());

// rs1 is not done yet, so it should NOT be closed immediately
Mockito.verify(rs1, Mockito.never()).close();

// Complete rs1 and verify it gets closed asynchronously
latch.countDown();

// Wait for async close to happen (with timeout)
boolean closeHappened = closeLatch.await(2, TimeUnit.SECONDS);
assertTrue(closeHappened);

// Explicitly verify rs1 (idle) was closed
Mockito.verify(rs1).close();

// Verify rs2 (winner) was not closed
Mockito.verify(rs2, Mockito.never()).close();

rs2.close();
Mockito.verify(rs2).close();
}

@Test
Expand Down Expand Up @@ -171,6 +225,13 @@ public void testRS1FailsImmediatelyNext() throws Exception {
resultSet.next();

assertEquals(rs2, resultSet.getResultSet());

// rs1 failed exceptionally, so it should NOT be closed
// rs2 won and is the active result set, so it should NOT be closed
Mockito.verify(rs2, Mockito.never()).close();

// Cleanup
latch.countDown();
}

@Test
Expand All @@ -184,6 +245,14 @@ public void testRS1SucceedsDuringNext() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);

// Set up rs2 to notify when close() is called for async verification
doAnswer(invocation -> {
closeLatch.countDown();
return null;
}).when(rs2).close();

// inject a sleep
doAnswer((InvocationOnMock invocation) -> {
Thread thread = new Thread(() -> {
Expand Down Expand Up @@ -245,7 +314,20 @@ public void testRS1SucceedsDuringNext() throws Exception {

assertEquals(rs1, resultSet.getResultSet());

// Cleanup
// rs2 is not done yet, so it should NOT be closed immediately
Mockito.verify(rs2, Mockito.never()).close();

// Now complete rs2 and verify it gets closed asynchronously
latch2.countDown();

// Wait for async close to happen (with timeout)
boolean closeHappened = closeLatch.await(2, TimeUnit.SECONDS);
assertTrue(closeHappened);

// Explicitly verify rs2 (idle) was closed
Mockito.verify(rs2).close();

// Verify rs1 (winner) was not closed
Mockito.verify(rs1, Mockito.never()).close();
}
}