Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ package org.apache

import java.util.Properties

import org.apache.arrow.memory.RootAllocator

package object comet {

/**
* The root allocator for Comet execution. Because Arrow Java memory management is based on
* reference counting, exposed arrays increase the reference count of the underlying buffers.
* Until the reference count is zero, the memory will not be released. If the consumer side is
* finished later than the close of the allocator, the allocator will think the memory is
* leaked. To avoid this, we use a single allocator for the whole execution process.
*/
val CometArrowAllocator = new RootAllocator(Long.MaxValue)

/**
* Provides access to build information about the Comet libraries. This will be used by the
* benchmarking software to provide the source revision and repository. In addition, the build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue)
private val allocator = CometArrowAllocator
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

Expand Down
12 changes: 5 additions & 7 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
*/
case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable {
private var allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue)
private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator)
private var arrowReader = new ArrowStreamReader(channelReader, allocator)
private val channelReader =
new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator)
private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator)
private var root = arrowReader.getVectorSchemaRoot

def nextBatch(): Option[ColumnarBatch] = {
Expand All @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au
if (root != null) {
arrowReader.close()
root.close()
allocator.close()

arrowReader = null
root = null
allocator = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import org.apache.comet.CometConf

class CometTPCDSQuerySuite
extends {
override val excludedTpcdsQueries: Set[String] = Set()

// This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too.
// So we cannot override `excludedTpcdsQueries` to exclude the queries.
val tpcdsAllQueries: Seq[String] = Seq(
"q1",
"q2",
Expand Down Expand Up @@ -112,7 +108,9 @@ class CometTPCDSQuerySuite
"q69",
"q70",
"q71",
"q72",
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72 in
// https://github.com/apache/datafusion-comet/pull/613.
// "q72",
Comment on lines +111 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on skipping running the official q72 query by default (because it is so ridiculous), especially in CI. However, maybe we should consider running an optimized version where the join order is sensible, which makes it at least 10x faster and uses far less memory. I will file a follow on issue to discuss this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of q72 is to test vendors join reordering rules, and that isn't really very relevant to Spark or Comet since Spark queries typically don't have access to statistics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the version I have been using locally. Since we are not aiming to run the official TPC-DS benchmarks, but just our derived benchmarks, and also given that we are comparing Spark to Comet for the same queries, I think this would be fine to use by default as long it is well documented in our benchmarking guide.

I do think we should still test with the original q72 as a separate exercise though, because if Spark can run it then Comet should be able to as well (with the same memory configuration).

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join item on (i_item_sk = cs_item_sk)
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '501-1000'
  and d1.d_year = 1999
  and cd_marital_status = 'S'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of q72 is to test vendors join reordering rules, and that isn't really very relevant to Spark or Comet since Spark queries typically don't have access to statistics.

Btw, Spark has the capacity to do join reordering if statistics are available but it relies on enabling CBO features which are disabled by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on skipping running the official q72 query by default (because it is so ridiculous), especially in CI. However, maybe we should consider running an optimized version where the join order is sensible, which makes it at least 10x faster and uses far less memory. I will file a follow on issue to discuss this.

Sounds good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should still test with the original q72 as a separate exercise though, because if Spark can run it then Comet should be able to as well (with the same memory configuration).

Yea. As I mentioned earlier, I will investigate q72 further to see why it requires extra memory in Comet. Just disable it to unblock this PR.

"q73",
"q74",
"q75",
Expand Down Expand Up @@ -141,9 +139,45 @@ class CometTPCDSQuerySuite
"q98",
"q99")

// TODO: enable the 3 queries after fixing the issues #1358.
override val tpcdsQueries: Seq[String] =
tpcdsAllQueries.filterNot(excludedTpcdsQueries.contains)
val tpcdsAllQueriesV2_7_0: Seq[String] = Seq(
"q5a",
"q6",
"q10a",
"q11",
"q12",
"q14",
"q14a",
"q18a",
"q20",
"q22",
"q22a",
"q24",
"q27a",
"q34",
"q35",
"q35a",
"q36a",
"q47",
"q49",
"q51a",
"q57",
"q64",
"q67a",
"q70a",
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72-v2.7
// in https://github.com/apache/datafusion-comet/pull/613.
// "q72",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In latest run, I saw Error: Process completed with exit code 143.. It seems like the memory usage is larger than the Github action runner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a few particular queries (q72, q16) seems to use more memory than others. q72 cannot be run through sort merge join config now in the CI runner due to its resource limit, but I can run it locally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate the two queries further but they seem not related to the changes here.

"q74",
"q75",
"q77a",
"q78",
"q80a",
"q86a",
"q98")

override val tpcdsQueries: Seq[String] = tpcdsAllQueries

override val tpcdsQueriesV2_7_0: Seq[String] = tpcdsAllQueriesV2_7_0
}
with CometTPCDSQueryTestSuite
with ShimCometTPCDSQuerySuite {
Expand All @@ -157,9 +191,11 @@ class CometTPCDSQuerySuite
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "20g")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Comment on lines +195 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we can close #387 we should either change the default for COMET_SHUFFLE_ENFORCE_MODE_ENABLED or remove it completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR but we should not close the issue when we merge this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me create another issue for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #648 for this PR.

conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "20g")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
conf
}

Expand Down