Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Nov 5, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

### What changes were proposed in this pull request?

This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@github-actions github-actions bot added the SQL label Nov 5, 2022
@pull pull bot added ⤵️ pull and removed SQL labels Nov 5, 2022
@pull pull bot merged commit 7009ef0 into wangyum:master Nov 6, 2022
pull bot pushed a commit that referenced this pull request Nov 22, 2024
…ead pool

### What changes were proposed in this pull request?

This PR aims to use a meaningful class name prefix for REST Submission API thread pool instead of the default value of Jetty QueuedThreadPool, `"qtp"+super.hashCode()`.

https://github.com/dekellum/jetty/blob/3dc0120d573816de7d6a83e2d6a97035288bdd4a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java#L64

### Why are the changes needed?

This is helpful during JVM investigation.

**BEFORE (4.0.0-preview2)**

```
$ SPARK_MASTER_OPTS='-Dspark.master.rest.enabled=true' sbin/start-master.sh
$ jstack 28217 | grep qtp
"qtp1925630411-52" #52 daemon prio=5 os_prio=31 cpu=0.07ms elapsed=19.06s tid=0x0000000134906c10 nid=0xde03 runnable  [0x0000000314592000]
"qtp1925630411-53" #53 daemon prio=5 os_prio=31 cpu=0.05ms elapsed=19.06s tid=0x0000000134ac6810 nid=0xc603 runnable  [0x000000031479e000]
"qtp1925630411-54" #54 daemon prio=5 os_prio=31 cpu=0.06ms elapsed=19.06s tid=0x000000013491ae10 nid=0xdc03 runnable  [0x00000003149aa000]
"qtp1925630411-55" #55 daemon prio=5 os_prio=31 cpu=0.08ms elapsed=19.06s tid=0x0000000134ac9810 nid=0xc803 runnable  [0x0000000314bb6000]
"qtp1925630411-56" #56 daemon prio=5 os_prio=31 cpu=0.04ms elapsed=19.06s tid=0x0000000134ac9e10 nid=0xda03 runnable  [0x0000000314dc2000]
"qtp1925630411-57" #57 daemon prio=5 os_prio=31 cpu=0.05ms elapsed=19.06s tid=0x0000000134aca410 nid=0xca03 runnable  [0x0000000314fce000]
"qtp1925630411-58" #58 daemon prio=5 os_prio=31 cpu=0.04ms elapsed=19.06s tid=0x0000000134acaa10 nid=0xcb03 runnable  [0x00000003151da000]
"qtp1925630411-59" #59 daemon prio=5 os_prio=31 cpu=0.06ms elapsed=19.06s tid=0x0000000134acb010 nid=0xcc03 runnable  [0x00000003153e6000]
"qtp1925630411-60-acceptor-0108e9815-ServerConnector1e497474{HTTP/1.1, (http/1.1)}{M3-Max.local:6066}" #60 daemon prio=3 os_prio=31 cpu=0.11ms elapsed=19.06s tid=0x00000001317ffa10 nid=0xcd03 runnable  [0x00000003155f2000]
"qtp1925630411-61-acceptor-11d90f2aa-ServerConnector1e497474{HTTP/1.1, (http/1.1)}{M3-Max.local:6066}" #61 daemon prio=3 os_prio=31 cpu=0.10ms elapsed=19.06s tid=0x00000001314ed610 nid=0xcf03 waiting on condition  [0x00000003157fe000]
```

**AFTER**
```
$ SPARK_MASTER_OPTS='-Dspark.master.rest.enabled=true' sbin/start-master.sh
$ jstack 28317 | grep StandaloneRestServer
"StandaloneRestServer-52" #52 daemon prio=5 os_prio=31 cpu=0.09ms elapsed=60.06s tid=0x00000001284a8e10 nid=0xdb03 runnable  [0x000000032cfce000]
"StandaloneRestServer-53" #53 daemon prio=5 os_prio=31 cpu=0.06ms elapsed=60.06s tid=0x00000001284acc10 nid=0xda03 runnable  [0x000000032d1da000]
"StandaloneRestServer-54" #54 daemon prio=5 os_prio=31 cpu=0.05ms elapsed=60.06s tid=0x00000001284ae610 nid=0xd803 runnable  [0x000000032d3e6000]
"StandaloneRestServer-55" #55 daemon prio=5 os_prio=31 cpu=0.09ms elapsed=60.06s tid=0x00000001284aec10 nid=0xd703 runnable  [0x000000032d5f2000]
"StandaloneRestServer-56" #56 daemon prio=5 os_prio=31 cpu=0.06ms elapsed=60.06s tid=0x00000001284af210 nid=0xc803 runnable  [0x000000032d7fe000]
"StandaloneRestServer-57" #57 daemon prio=5 os_prio=31 cpu=0.05ms elapsed=60.06s tid=0x00000001284af810 nid=0xc903 runnable  [0x000000032da0a000]
"StandaloneRestServer-58" #58 daemon prio=5 os_prio=31 cpu=0.06ms elapsed=60.06s tid=0x00000001284afe10 nid=0xcb03 runnable  [0x000000032dc16000]
"StandaloneRestServer-59" #59 daemon prio=5 os_prio=31 cpu=0.05ms elapsed=60.06s tid=0x00000001284b0410 nid=0xcc03 runnable  [0x000000032de22000]
"StandaloneRestServer-60-acceptor-04aefbaa8-ServerConnector44284d85{HTTP/1.1, (http/1.1)}{M3-Max.local:6066}" #60 daemon prio=3 os_prio=31 cpu=0.13ms elapsed=60.05s tid=0x000000015cda1a10 nid=0xcd03 runnable  [0x000000032e02e000]
"StandaloneRestServer-61-acceptor-148976251-ServerConnector44284d85{HTTP/1.1, (http/1.1)}{M3-Max.local:6066}" #61 daemon prio=3 os_prio=31 cpu=0.12ms elapsed=60.05s tid=0x000000015cd1c810 nid=0xce03 waiting on condition  [0x000000032e23a000]
```

### Does this PR introduce _any_ user-facing change?

No, the thread names are accessed during the debugging.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48924 from dongjoon-hyun/SPARK-50385.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: panbingkun <[email protected]>
pull bot pushed a commit that referenced this pull request Jul 21, 2025
…ingBuilder`

### What changes were proposed in this pull request?

This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression.

### Why are the changes needed?

Since Java 9, `String Concatenation` has been handled better by default.

| ID | DESCRIPTION |
| - | - |
| JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) |

For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly.

**CODE CHANGE**
```java

- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-   .append("appId", appId)
-   .append("execId", execId)
-   .append("blockIds", Arrays.toString(blockIds))
-   .toString();
+ return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" +
+     Arrays.toString(blockIds) + "]";
```

**BEFORE**
```
  public java.lang.String toString();
    Code:
       0: new           #39                 // class org/apache/commons/lang3/builder/ToStringBuilder
       3: dup
       4: aload_0
       5: getstatic     #41                 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle;
       8: invokespecial #47                 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V
      11: ldc           #50                 // String appId
      13: aload_0
      14: getfield      #7                  // Field appId:Ljava/lang/String;
      17: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      20: ldc           #55                 // String execId
      22: aload_0
      23: getfield      #13                 // Field execId:Ljava/lang/String;
      26: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      29: ldc           #56                 // String blockIds
      31: aload_0
      32: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      35: invokestatic  #57                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      38: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      41: invokevirtual #61                 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String;
      44: areturn
```

**AFTER**
```
  public java.lang.String toString();
    Code:
       0: aload_0
       1: getfield      #7                  // Field appId:Ljava/lang/String;
       4: aload_0
       5: getfield      #13                 // Field execId:Ljava/lang/String;
       8: aload_0
       9: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      12: invokestatic  #39                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      15: invokedynamic #43,  0             // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;
      20: areturn
```

### Does this PR introduce _any_ user-facing change?

No. This is an `toString` implementation improvement.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51572 from dongjoon-hyun/SPARK-52880.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant