Skip to content

Commit d905221

Browse files
committed
HBASE-26959 Brotli compression support (#4353)
Signed-off-by: Nick Dimiduk <[email protected]>
1 parent e2ac5a9 commit d905221

File tree

32 files changed

+1012
-47
lines changed

32 files changed

+1012
-47
lines changed

hbase-assembly/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@
314314
<groupId>org.apache.hbase</groupId>
315315
<artifactId>hbase-compression-aircompressor</artifactId>
316316
</dependency>
317+
<dependency>
318+
<groupId>org.apache.hbase</groupId>
319+
<artifactId>hbase-compression-brotli</artifactId>
320+
</dependency>
317321
<dependency>
318322
<groupId>org.apache.hbase</groupId>
319323
<artifactId>hbase-compression-lz4</artifactId>

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
public final class Compression {
4646
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
4747

48-
4948
// LZO
5049

5150
public static final String LZO_CODEC_CLASS_KEY =
@@ -97,6 +96,13 @@ public final class Compression {
9796
public static final String LZMA_CODEC_CLASS_DEFAULT =
9897
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
9998

99+
// Brotli
100+
101+
public static final String BROTLI_CODEC_CLASS_KEY =
102+
"hbase.io.compress.brotli.codec";
103+
public static final String BROTLI_CODEC_CLASS_DEFAULT =
104+
"org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
105+
100106
/**
101107
* Prevent the instantiation of class.
102108
*/
@@ -148,6 +154,7 @@ private static ClassLoader getClassLoaderForCodec() {
148154
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
149155
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
150156
justification="We are not serializing so doesn't apply (not sure why transient though)")
157+
@SuppressWarnings("ImmutableEnumChecker")
151158
@InterfaceAudience.Public
152159
public static enum Algorithm {
153160
// LZO is GPL and requires extra install to setup. See
@@ -352,6 +359,31 @@ public CompressionCodec reload(Configuration conf) {
352359
return lzmaCodec;
353360
}
354361
}
362+
},
363+
364+
BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
365+
// Use base type to avoid compile-time dependencies.
366+
private volatile transient CompressionCodec brotliCodec;
367+
private final transient Object lock = new Object();
368+
@Override
369+
CompressionCodec getCodec(Configuration conf) {
370+
if (brotliCodec == null) {
371+
synchronized (lock) {
372+
if (brotliCodec == null) {
373+
brotliCodec = buildCodec(conf, this);
374+
}
375+
}
376+
}
377+
return brotliCodec;
378+
}
379+
@Override
380+
public CompressionCodec reload(Configuration conf) {
381+
synchronized (lock) {
382+
brotliCodec = buildCodec(conf, this);
383+
LOG.warn("Reloaded configuration for {}", name());
384+
return brotliCodec;
385+
}
386+
}
355387
};
356388

357389
private final Configuration conf;

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,16 @@ public static int roundInt2(int v) {
3535
return v;
3636
}
3737

38+
/**
39+
* Most compression algorithms can be presented with pathological input that causes an
40+
* expansion rather than a compression. Hadoop's compression API requires that we calculate
41+
* additional buffer space required for the worst case. There is a formula developed for
42+
* gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
43+
* has been tested as such with a range of different inputs.
44+
*/
45+
public static int compressionOverhead(int bufferSize) {
46+
// Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
47+
// 32 bytes (block header) and addition 1/6th of the input size.
48+
return (bufferSize / 6) + 32;
49+
}
3850
}

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
5757
if (outBuf.hasRemaining()) {
5858
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
5959
outBuf.get(b, off, n);
60-
LOG.trace("compress: {} bytes from outBuf", n);
60+
LOG.trace("compress: read {} remaining bytes from outBuf", n);
6161
return n;
6262
}
6363
// We don't actually begin compression until our caller calls finish().

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
5151
if (outBuf.hasRemaining()) {
5252
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
5353
outBuf.get(b, off, n);
54-
LOG.trace("decompress: {} bytes from outBuf", n);
54+
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
5555
return n;
5656
}
5757
if (inBuf.position() > 0) {

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configurable;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.CommonConfigurationKeys;
26+
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2627
import org.apache.hadoop.io.compress.BlockCompressorStream;
2728
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2829
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
9192
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
9293
throws IOException {
9394
int bufferSize = getBufferSize(conf);
94-
int compressionOverhead = (bufferSize / 6) + 32;
95-
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
95+
return new BlockCompressorStream(out, c, bufferSize,
96+
CompressionUtil.compressionOverhead(bufferSize));
9697
}
9798

9899
@Override
@@ -149,10 +150,9 @@ public class HadoopLz4Decompressor extends HadoopDecompressor<Lz4Decompressor> {
149150
// Package private
150151

151152
static int getBufferSize(Configuration conf) {
152-
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
153+
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
153154
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
154155
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
155-
return size > 0 ? size : 256 * 1024; // Don't change this default
156156
}
157157

158158
}

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configurable;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.CommonConfigurationKeys;
26+
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2627
import org.apache.hadoop.io.compress.BlockCompressorStream;
2728
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2829
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
9192
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
9293
throws IOException {
9394
int bufferSize = getBufferSize(conf);
94-
int compressionOverhead = (bufferSize / 6) + 32;
95-
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
95+
return new BlockCompressorStream(out, c, bufferSize,
96+
CompressionUtil.compressionOverhead(bufferSize));
9697
}
9798

9899
@Override
@@ -149,10 +150,9 @@ public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> {
149150
// Package private
150151

151152
static int getBufferSize(Configuration conf) {
152-
int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
153+
return conf.getInt(LZO_BUFFER_SIZE_KEY,
153154
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
154155
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
155-
return size > 0 ? size : 256 * 1024; // Don't change this default
156156
}
157157

158158
}

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configurable;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.CommonConfigurationKeys;
26+
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2627
import org.apache.hadoop.io.compress.BlockCompressorStream;
2728
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2829
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
9192
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
9293
throws IOException {
9394
int bufferSize = getBufferSize(conf);
94-
int compressionOverhead = (bufferSize / 6) + 32;
95-
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
95+
return new BlockCompressorStream(out, c, bufferSize,
96+
CompressionUtil.compressionOverhead(bufferSize));
9697
}
9798

9899
@Override
@@ -149,10 +150,9 @@ public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompres
149150
// Package private
150151

151152
static int getBufferSize(Configuration conf) {
152-
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
153+
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
153154
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
154155
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
155-
return size > 0 ? size : 256 * 1024; // Don't change this default
156156
}
157157

158158
}

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.conf.Configurable;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.CommonConfigurationKeys;
26+
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2627
import org.apache.hadoop.io.compress.BlockCompressorStream;
2728
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2829
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -52,6 +53,7 @@
5253
public class ZstdCodec implements Configurable, CompressionCodec {
5354

5455
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
56+
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
5557

5658
private Configuration conf;
5759

@@ -99,8 +101,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
99101
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
100102
throws IOException {
101103
int bufferSize = getBufferSize(conf);
102-
int compressionOverhead = (bufferSize / 6) + 32;
103-
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
104+
return new BlockCompressorStream(out, c, bufferSize,
105+
CompressionUtil.compressionOverhead(bufferSize));
104106
}
105107

106108
@Override
@@ -157,10 +159,10 @@ public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor>
157159
// Package private
158160

159161
static int getBufferSize(Configuration conf) {
160-
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
162+
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
161163
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
162-
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
163-
return size > 0 ? size : 256 * 1024; // Don't change this default
164+
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
165+
ZSTD_BUFFER_SIZE_DEFAULT));
164166
}
165167

166168
}

0 commit comments

Comments
 (0)