Skip to content

Commit c85a076

Browse files
committed
[HBASE-27991] fixing ClassCastException in multithread client run
1 parent 7cfa47d commit c85a076

File tree

2 files changed

+98
-1
lines changed

2 files changed

+98
-1
lines changed

hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/MultiThreadedClientExample.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.ExecutorService;
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.ForkJoinPool;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.LinkedBlockingQueue;
2729
import java.util.concurrent.Future;
2830
import java.util.concurrent.ThreadFactory;
2931
import java.util.concurrent.ThreadLocalRandom;
@@ -129,7 +131,14 @@ public int run(String[] args) throws Exception {
129131
//
130132
// We don't want to mix hbase and business logic.
131133
//
132-
ExecutorService service = new ForkJoinPool(threads * 2);
134+
ThreadPoolExecutor service = new ThreadPoolExecutor(
135+
threads * 2,
136+
threads * 2,
137+
60L,
138+
TimeUnit.SECONDS,
139+
new LinkedBlockingQueue<>()
140+
);
141+
133142

134143
// Create two different connections showing how it's possible to
135144
// separate different types of requests onto different connections
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client.example;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotEquals;
22+
23+
import java.io.IOException;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.HBaseTestingUtility;
26+
import org.apache.hadoop.hbase.TableName;
27+
import org.apache.hadoop.hbase.client.Result;
28+
import org.apache.hadoop.hbase.client.ResultScanner;
29+
import org.apache.hadoop.hbase.client.Scan;
30+
import org.apache.hadoop.hbase.client.Table;
31+
import org.apache.hadoop.hbase.testclassification.ClientTests;
32+
import org.apache.hadoop.hbase.testclassification.MediumTests;
33+
import org.apache.hadoop.hbase.util.Bytes;
34+
import org.junit.AfterClass;
35+
import org.junit.BeforeClass;
36+
import org.junit.ClassRule;
37+
import org.junit.Test;
38+
import org.junit.experimental.categories.Category;
39+
40+
@Category({ ClientTests.class, MediumTests.class })
41+
public class TestMultiThreadedClientExample {
42+
43+
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
44+
private static String tableName = "test_mt_table";
45+
private static Table table;
46+
static final TableName MY_TABLE_NAME = TableName.valueOf(tableName);
47+
private static byte[] familyName = Bytes.toBytes("d");
48+
private static byte[] columnName = Bytes.toBytes("col");
49+
50+
@ClassRule
51+
public static final HBaseClassTestRule CLASS_RULE =
52+
HBaseClassTestRule.forClass(TestMultiThreadedClientExample.class);
53+
54+
@BeforeClass
55+
public static void setup() throws Exception {
56+
TEST_UTIL.startMiniCluster(1);
57+
table = TEST_UTIL.createTable(MY_TABLE_NAME, familyName);
58+
}
59+
60+
@AfterClass
61+
public static void tearDown() throws Exception {
62+
TEST_UTIL.deleteTable(MY_TABLE_NAME);
63+
TEST_UTIL.shutdownMiniCluster();
64+
}
65+
66+
@Test
67+
public void testMultiThreadedClientExample() throws Exception {
68+
MultiThreadedClientExample example = new MultiThreadedClientExample();
69+
example.setConf(TEST_UTIL.getConfiguration());
70+
String[] args = { tableName, "200" };
71+
// Define assertions to check the returned data here
72+
assertEquals(0, example.run(args));
73+
// Define assertions to check the row count of the table
74+
assertNotEquals(0, getRowCountFromTable(table));
75+
}
76+
77+
// getRowCountFromTable takes table as input and return the number of rows
78+
private static int getRowCountFromTable(Table t) throws IOException {
79+
Scan scan = new Scan();
80+
// Scan the table and get the scan result
81+
ResultScanner scanner = t.getScanner(scan);
82+
int rowCount = 0;
83+
for (Result result : scanner) {
84+
rowCount++;
85+
}
86+
return rowCount;
87+
}
88+
}

0 commit comments

Comments
 (0)