Skip to content

Commit 8e0646e

Browse files
committed
MAPREDUCE-7494. File stream leak when LineRecordReader is interrupted (#7117)
Contributed by Davin Tjong
1 parent 85b5bd2 commit 8e0646e

File tree

2 files changed

+44
-37
lines changed

2 files changed

+44
-37
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ public synchronized void close() throws IOException {
302302
try {
303303
if (in != null) {
304304
in.close();
305+
} else if (fileIn != null) {
306+
fileIn.close();
305307
}
306308
} finally {
307309
if (decompressor != null) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -98,48 +98,53 @@ public void initialize(InputSplit genericSplit,
9898
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
9999
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
100100
fileIn = FutureIO.awaitFuture(builder.build());
101-
102-
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
103-
if (null!=codec) {
104-
isCompressedInput = true;
105-
decompressor = CodecPool.getDecompressor(codec);
106-
if (codec instanceof SplittableCompressionCodec) {
107-
final SplitCompressionInputStream cIn =
108-
((SplittableCompressionCodec)codec).createInputStream(
109-
fileIn, decompressor, start, end,
110-
SplittableCompressionCodec.READ_MODE.BYBLOCK);
111-
in = new CompressedSplitLineReader(cIn, job,
112-
this.recordDelimiterBytes);
113-
start = cIn.getAdjustedStart();
114-
end = cIn.getAdjustedEnd();
115-
filePosition = cIn;
116-
} else {
117-
if (start != 0) {
118-
// So we have a split that is only part of a file stored using
119-
// a Compression codec that cannot be split.
120-
throw new IOException("Cannot seek in " +
121-
codec.getClass().getSimpleName() + " compressed stream");
122-
}
123101

124-
in = new SplitLineReader(codec.createInputStream(fileIn,
125-
decompressor), job, this.recordDelimiterBytes);
102+
try {
103+
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
104+
if (null!=codec) {
105+
isCompressedInput = true;
106+
decompressor = CodecPool.getDecompressor(codec);
107+
if (codec instanceof SplittableCompressionCodec) {
108+
final SplitCompressionInputStream cIn =
109+
((SplittableCompressionCodec)codec).createInputStream(
110+
fileIn, decompressor, start, end,
111+
SplittableCompressionCodec.READ_MODE.BYBLOCK);
112+
in = new CompressedSplitLineReader(cIn, job,
113+
this.recordDelimiterBytes);
114+
start = cIn.getAdjustedStart();
115+
end = cIn.getAdjustedEnd();
116+
filePosition = cIn;
117+
} else {
118+
if (start != 0) {
119+
// So we have a split that is only part of a file stored using
120+
// a Compression codec that cannot be split.
121+
throw new IOException("Cannot seek in " +
122+
codec.getClass().getSimpleName() + " compressed stream");
123+
}
124+
125+
in = new SplitLineReader(codec.createInputStream(fileIn,
126+
decompressor), job, this.recordDelimiterBytes);
127+
filePosition = fileIn;
128+
}
129+
} else {
130+
fileIn.seek(start);
131+
in = new UncompressedSplitLineReader(
132+
fileIn, job, this.recordDelimiterBytes, split.getLength());
126133
filePosition = fileIn;
127134
}
128-
} else {
129-
fileIn.seek(start);
130-
in = new UncompressedSplitLineReader(
131-
fileIn, job, this.recordDelimiterBytes, split.getLength());
132-
filePosition = fileIn;
133-
}
134-
// If this is not the first split, we always throw away first record
135-
// because we always (except the last split) read one extra line in
136-
// next() method.
137-
if (start != 0) {
138-
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
135+
// If this is not the first split, we always throw away first record
136+
// because we always (except the last split) read one extra line in
137+
// next() method.
138+
if (start != 0) {
139+
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
140+
}
141+
this.pos = start;
142+
} catch (Exception e) {
143+
fileIn.close();
144+
throw e;
139145
}
140-
this.pos = start;
141146
}
142-
147+
143148

144149
private int maxBytesToConsume(long pos) {
145150
return isCompressedInput

0 commit comments

Comments
 (0)