Skip to content

Commit b9bb921

Browse files
committed
Merge branch 'trunk' into dev/HADOOP-16854-oomfix
2 parents 1cc7f96 + d60496e commit b9bb921

File tree

72 files changed

+2232
-325
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2232
-325
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BytesWritable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
@InterfaceStability.Stable
3939
public class BytesWritable extends BinaryComparable
4040
implements WritableComparable<BinaryComparable> {
41+
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
4142
private static final int LENGTH_BYTES = 4;
4243

4344
private static final byte[] EMPTY_BYTES = new byte[0];
@@ -126,7 +127,7 @@ public int getSize() {
126127
public void setSize(int size) {
127128
if (size > getCapacity()) {
128129
// Avoid overflowing the int too early by casting to a long.
129-
long newSize = Math.min(Integer.MAX_VALUE, (3L * size) / 2L);
130+
long newSize = Math.min(MAX_ARRAY_SIZE, (3L * size) / 2L);
130131
setCapacity((int) newSize);
131132
}
132133
this.size = size;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,22 @@ public static String getHostNameOfIP(String ipPort) {
638638
}
639639
}
640640

641+
/**
642+
* Attempt to normalize the given string to "host:port"
643+
* if it like "ip:port".
644+
*
645+
* @param ipPort maybe lik ip:port or host:port.
646+
* @return host:port
647+
*/
648+
public static String normalizeIP2HostName(String ipPort) {
649+
if (null == ipPort || !ipPortPattern.matcher(ipPort).matches()) {
650+
return ipPort;
651+
}
652+
653+
InetSocketAddress address = createSocketAddr(ipPort);
654+
return getHostPortString(address);
655+
}
656+
641657
/**
642658
* Return hostname without throwing exception.
643659
* The returned hostname String format is "hostname".

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,7 @@ public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
949949
* <p>
950950
* As an additional twist, we also randomize the nodes at each network
951951
* distance. This helps with load balancing when there is data skew.
952+
* And it helps choose node with more fast storage type.
952953
*
953954
* @param reader Node where data will be read
954955
* @param nodes Available replicas with the requested data

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/CompressDecompressTester.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private void addPair(T compressor, E decompressor, String name) {
126126
builder.add(new TesterPair<T, E>(name, compressor, decompressor));
127127
}
128128

129-
public void test() throws InstantiationException, IllegalAccessException {
129+
public void test() throws Exception {
130130
pairs = builder.build();
131131
pairs = assertionDelegate.filterOnAssumeWhat(pairs);
132132

@@ -287,47 +287,45 @@ private boolean checkSetInputArrayIndexOutOfBoundsException(
287287

288288
@Override
289289
public void assertCompression(String name, Compressor compressor,
290-
Decompressor decompressor, byte[] rawData) {
290+
Decompressor decompressor, byte[] rawData) throws Exception {
291291

292292
int cSize = 0;
293293
int decompressedSize = 0;
294-
byte[] compressedResult = new byte[rawData.length];
294+
// Snappy compression can increase data size
295+
int maxCompressedLength = 32 + rawData.length + rawData.length/6;
296+
byte[] compressedResult = new byte[maxCompressedLength];
295297
byte[] decompressedBytes = new byte[rawData.length];
296-
try {
297-
assertTrue(
298-
joiner.join(name, "compressor.needsInput before error !!!"),
299-
compressor.needsInput());
300-
assertTrue(
298+
assertTrue(
299+
joiner.join(name, "compressor.needsInput before error !!!"),
300+
compressor.needsInput());
301+
assertEquals(
301302
joiner.join(name, "compressor.getBytesWritten before error !!!"),
302-
compressor.getBytesWritten() == 0);
303-
compressor.setInput(rawData, 0, rawData.length);
304-
compressor.finish();
305-
while (!compressor.finished()) {
306-
cSize += compressor.compress(compressedResult, 0,
307-
compressedResult.length);
308-
}
309-
compressor.reset();
310-
311-
assertTrue(
312-
joiner.join(name, "decompressor.needsInput() before error !!!"),
313-
decompressor.needsInput());
314-
decompressor.setInput(compressedResult, 0, cSize);
315-
assertFalse(
316-
joiner.join(name, "decompressor.needsInput() after error !!!"),
317-
decompressor.needsInput());
318-
while (!decompressor.finished()) {
319-
decompressedSize = decompressor.decompress(decompressedBytes, 0,
320-
decompressedBytes.length);
321-
}
322-
decompressor.reset();
323-
assertTrue(joiner.join(name, " byte size not equals error !!!"),
324-
decompressedSize == rawData.length);
325-
assertArrayEquals(
326-
joiner.join(name, " byte arrays not equals error !!!"), rawData,
327-
decompressedBytes);
328-
} catch (Exception ex) {
329-
fail(joiner.join(name, ex.getMessage()));
303+
0, compressor.getBytesWritten());
304+
compressor.setInput(rawData, 0, rawData.length);
305+
compressor.finish();
306+
while (!compressor.finished()) {
307+
cSize += compressor.compress(compressedResult, 0,
308+
compressedResult.length);
309+
}
310+
compressor.reset();
311+
312+
assertTrue(
313+
joiner.join(name, "decompressor.needsInput() before error !!!"),
314+
decompressor.needsInput());
315+
decompressor.setInput(compressedResult, 0, cSize);
316+
assertFalse(
317+
joiner.join(name, "decompressor.needsInput() after error !!!"),
318+
decompressor.needsInput());
319+
while (!decompressor.finished()) {
320+
decompressedSize = decompressor.decompress(decompressedBytes, 0,
321+
decompressedBytes.length);
330322
}
323+
decompressor.reset();
324+
assertEquals(joiner.join(name, " byte size not equals error !!!"),
325+
rawData.length, decompressedSize);
326+
assertArrayEquals(
327+
joiner.join(name, " byte arrays not equals error !!!"), rawData,
328+
decompressedBytes);
331329
}
332330
}),
333331

@@ -519,6 +517,6 @@ abstract static class TesterCompressionStrategy {
519517
protected final Logger logger = Logger.getLogger(getClass());
520518

521519
abstract void assertCompression(String name, Compressor compressor,
522-
Decompressor decompressor, byte[] originalRawData);
520+
Decompressor decompressor, byte[] originalRawData) throws Exception;
523521
}
524522
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.io.compress.snappy;
1919

20+
import static org.assertj.core.api.Assertions.assertThat;
2021
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertTrue;
2223
import static org.junit.Assert.fail;
@@ -44,11 +45,16 @@
4445
import org.junit.Assert;
4546
import org.junit.Before;
4647
import org.junit.Test;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
4750

4851
import static org.junit.Assume.*;
4952

5053
public class TestSnappyCompressorDecompressor {
5154

55+
public static final Logger LOG =
56+
LoggerFactory.getLogger(TestSnappyCompressorDecompressor.class);
57+
5258
@Before
5359
public void before() {
5460
assumeTrue(SnappyCodec.isNativeCodeLoaded());
@@ -167,40 +173,41 @@ public void testSnappyDecompressorCompressAIOBException() {
167173
}
168174

169175
@Test
170-
public void testSnappyCompressDecompress() {
176+
public void testSnappyCompressDecompress() throws Exception {
171177
int BYTE_SIZE = 1024 * 54;
172178
byte[] bytes = BytesGenerator.get(BYTE_SIZE);
173179
SnappyCompressor compressor = new SnappyCompressor();
174-
try {
175-
compressor.setInput(bytes, 0, bytes.length);
176-
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
177-
compressor.getBytesRead() > 0);
178-
assertTrue(
179-
"SnappyCompressDecompress getBytesWritten before compress error !!!",
180-
compressor.getBytesWritten() == 0);
181-
182-
byte[] compressed = new byte[BYTE_SIZE];
183-
int cSize = compressor.compress(compressed, 0, compressed.length);
184-
assertTrue(
185-
"SnappyCompressDecompress getBytesWritten after compress error !!!",
186-
compressor.getBytesWritten() > 0);
187-
188-
SnappyDecompressor decompressor = new SnappyDecompressor(BYTE_SIZE);
189-
// set as input for decompressor only compressed data indicated with cSize
190-
decompressor.setInput(compressed, 0, cSize);
191-
byte[] decompressed = new byte[BYTE_SIZE];
192-
decompressor.decompress(decompressed, 0, decompressed.length);
193-
194-
assertTrue("testSnappyCompressDecompress finished error !!!",
195-
decompressor.finished());
196-
Assert.assertArrayEquals(bytes, decompressed);
197-
compressor.reset();
198-
decompressor.reset();
199-
assertTrue("decompressor getRemaining error !!!",
200-
decompressor.getRemaining() == 0);
201-
} catch (Exception e) {
202-
fail("testSnappyCompressDecompress ex error!!!");
203-
}
180+
compressor.setInput(bytes, 0, bytes.length);
181+
assertTrue("SnappyCompressDecompress getBytesRead error !!!",
182+
compressor.getBytesRead() > 0);
183+
assertEquals(
184+
"SnappyCompressDecompress getBytesWritten before compress error !!!",
185+
0, compressor.getBytesWritten());
186+
187+
// snappy compression may increase data size.
188+
// This calculation comes from "Snappy::MaxCompressedLength(size_t)"
189+
int maxSize = 32 + BYTE_SIZE + BYTE_SIZE / 6;
190+
byte[] compressed = new byte[maxSize];
191+
int cSize = compressor.compress(compressed, 0, compressed.length);
192+
LOG.info("input size: {}", BYTE_SIZE);
193+
LOG.info("compressed size: {}", cSize);
194+
assertTrue(
195+
"SnappyCompressDecompress getBytesWritten after compress error !!!",
196+
compressor.getBytesWritten() > 0);
197+
198+
SnappyDecompressor decompressor = new SnappyDecompressor();
199+
// set as input for decompressor only compressed data indicated with cSize
200+
decompressor.setInput(compressed, 0, cSize);
201+
byte[] decompressed = new byte[BYTE_SIZE];
202+
decompressor.decompress(decompressed, 0, decompressed.length);
203+
204+
assertTrue("testSnappyCompressDecompress finished error !!!",
205+
decompressor.finished());
206+
Assert.assertArrayEquals(bytes, decompressed);
207+
compressor.reset();
208+
decompressor.reset();
209+
assertEquals("decompressor getRemaining error !!!",
210+
0, decompressor.getRemaining());
204211
}
205212

206213
@Test
@@ -278,7 +285,38 @@ public void testSnappyBlockCompression() {
278285
fail("testSnappyBlockCompression ex error !!!");
279286
}
280287
}
281-
288+
289+
@Test
290+
// The buffer size is smaller than the input.
291+
public void testSnappyCompressDecompressWithSmallBuffer() throws Exception {
292+
int inputSize = 1024 * 50;
293+
int bufferSize = 512;
294+
ByteArrayOutputStream out = new ByteArrayOutputStream();
295+
byte[] buffer = new byte[bufferSize];
296+
byte[] input = BytesGenerator.get(inputSize);
297+
298+
SnappyCompressor compressor = new SnappyCompressor();
299+
compressor.setInput(input, 0, inputSize);
300+
compressor.finish();
301+
while (!compressor.finished()) {
302+
int len = compressor.compress(buffer, 0, buffer.length);
303+
out.write(buffer, 0, len);
304+
}
305+
byte[] compressed = out.toByteArray();
306+
assertThat(compressed).hasSizeGreaterThan(0);
307+
out.reset();
308+
309+
SnappyDecompressor decompressor = new SnappyDecompressor();
310+
decompressor.setInput(compressed, 0, compressed.length);
311+
while (!decompressor.finished()) {
312+
int len = decompressor.decompress(buffer, 0, buffer.length);
313+
out.write(buffer, 0, len);
314+
}
315+
byte[] decompressed = out.toByteArray();
316+
317+
assertThat(decompressed).isEqualTo(input);
318+
}
319+
282320
private void compressDecompressLoop(int rawDataSize) throws IOException {
283321
byte[] rawData = BytesGenerator.get(rawDataSize);
284322
byte[] compressedResult = new byte[rawDataSize+20];

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import static org.junit.Assert.*;
4141

4242
/**
43-
* This tests timout out from SocketInputStream and
43+
* This tests timeout out from SocketInputStream and
4444
* SocketOutputStream using pipes.
4545
*
4646
* Normal read and write using these streams are tested by pretty much

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3556,7 +3556,8 @@ public RemoteIterator<OpenFileEntry> listOpenFiles(
35563556

35573557
public RemoteIterator<OpenFileEntry> listOpenFiles(
35583558
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
3559-
return dfs.listOpenFiles(openFilesTypes, path);
3559+
Path absF = fixRelativePart(new Path(path));
3560+
return dfs.listOpenFiles(openFilesTypes, getPathName(absF));
35603561
}
35613562

35623563

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ interface Failover {
343343
PREFIX + "connection.retries.on.timeouts";
344344
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
345345
String RANDOM_ORDER = PREFIX + "random.order";
346-
boolean RANDOM_ORDER_DEFAULT = false;
346+
boolean RANDOM_ORDER_DEFAULT = true;
347347
String RESOLVE_ADDRESS_NEEDED_KEY = PREFIX + "resolve-needed";
348348
boolean RESOLVE_ADDRESS_NEEDED_DEFAULT = false;
349349
String RESOLVE_SERVICE_KEY = PREFIX + "resolver.impl";

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.lang.reflect.Constructor;
2929
import java.net.InetSocketAddress;
30+
import java.nio.channels.UnresolvedAddressException;
3031
import java.util.List;
3132

3233
import com.google.common.io.ByteArrayDataOutput;
@@ -823,7 +824,7 @@ private BlockReaderPeer nextTcpPeer() throws IOException {
823824
datanode);
824825
LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
825826
return new BlockReaderPeer(peer, false);
826-
} catch (IOException e) {
827+
} catch (IOException | UnresolvedAddressException e) {
827828
LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
828829
+ "{}", datanode);
829830
throw e;

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public ExtendedBlock getBlock() {
158158
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
159159
* to update the cached Storage ID/Type arrays.
160160
*/
161-
public DatanodeInfo[] getLocations() {
161+
public DatanodeInfoWithStorage[] getLocations() {
162162
return locs;
163163
}
164164

0 commit comments

Comments
 (0)