Skip to content

Commit cf3a987

Browse files
committed
BATCH-2434 Improve TransactionAwareBufferedWriter
TransactionAwareBufferedWriter offers a number of optimization potentials. First it buffers at the char level. Buffering at the byte level instead saves about 50% memory usage in common cases. Second it does not overwrite any of the #write(String) methods leading to unnecessary intermediate copies. * buffer in a ByteArrayOutputStream instead of StringBuilder * overwrite #write(String) methods to avoid copies Together these two changes should help to reduce both live set size and allocation rate. Issue: BATCH-2434 https://jira.spring.io/browse/BATCH-2434
1 parent b61440d commit cf3a987

File tree

4 files changed

+201
-98
lines changed

4 files changed

+201
-98
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/FlatFileItemWriter.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -506,23 +506,13 @@ public void close() {
506506

507507
private void closeStream() {
508508
try {
509-
if (fileChannel != null) {
510-
fileChannel.close();
509+
if (os != null) {
510+
os.close();
511511
}
512512
}
513513
catch (IOException ioe) {
514514
throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
515515
}
516-
finally {
517-
try {
518-
if (os != null) {
519-
os.close();
520-
}
521-
}
522-
catch (IOException ioe) {
523-
throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
524-
}
525-
}
526516
}
527517

528518
/**
@@ -561,7 +551,7 @@ private void initializeBufferedWriter() throws IOException {
561551
os = new FileOutputStream(file.getAbsolutePath(), true);
562552
fileChannel = os.getChannel();
563553

564-
outputBufferedWriter = getBufferedWriter(fileChannel, encoding);
554+
outputBufferedWriter = getBufferedWriter(os, encoding);
565555
outputBufferedWriter.flush();
566556

567557
if (append) {
@@ -591,11 +581,11 @@ public boolean isInitialized() {
591581
* Returns the buffered writer opened to the beginning of the file
592582
* specified by the absolute path name contained in absoluteFileName.
593583
*/
594-
private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
584+
private Writer getBufferedWriter(FileOutputStream outputStream, String encoding) {
595585
try {
596586
final FileChannel channel = fileChannel;
597587
if (transactional) {
598-
TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
588+
TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(outputStream, new Runnable() {
599589
@Override
600590
public void run() {
601591
closeStream();

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/xml/StaxEventItemWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ private void open(long position) {
456456
try {
457457
final FileChannel channel = fileChannel;
458458
if (transactional) {
459-
TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
459+
TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(os, new Runnable() {
460460
@Override
461461
public void run() {
462462
closeStream();

spring-batch-infrastructure/src/main/java/org/springframework/batch/support/transaction/TransactionAwareBufferedWriter.java

Lines changed: 98 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
*/
1616
package org.springframework.batch.support.transaction;
1717

18+
import java.io.BufferedOutputStream;
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.FileOutputStream;
1821
import java.io.IOException;
22+
import java.io.OutputStream;
23+
import java.io.OutputStreamWriter;
1924
import java.io.UnsupportedEncodingException;
2025
import java.io.Writer;
21-
import java.nio.ByteBuffer;
2226
import java.nio.channels.FileChannel;
2327

2428
import org.springframework.batch.item.WriteFailedException;
@@ -41,7 +45,7 @@ public class TransactionAwareBufferedWriter extends Writer {
4145

4246
private final Object closeKey;
4347

44-
private FileChannel channel;
48+
private FileOutputStream outputStream;
4549

4650
private final Runnable closeCallback;
4751

@@ -57,12 +61,12 @@ public class TransactionAwareBufferedWriter extends Writer {
5761
* to execute on close. The callback should clean up related resources like
5862
* output streams or channels.
5963
*
60-
* @param channel channel used to do the actual file IO
64+
* @param outputStream channel used to do the actual file IO
6165
* @param closeCallback callback to execute on close
6266
*/
63-
public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) {
67+
public TransactionAwareBufferedWriter(FileOutputStream outputStream, Runnable closeCallback) {
6468
super();
65-
this.channel = channel;
69+
this.outputStream = outputStream;
6670
this.closeCallback = closeCallback;
6771
this.bufferKey = new Object();
6872
this.closeKey = new Object();
@@ -71,6 +75,47 @@ public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallbac
7175
public void setEncoding(String encoding) {
7276
this.encoding = encoding;
7377
}
78+
79+
/**
80+
* Buffers the output on a {@code byte[]} level. Compared to
81+
* buffering on a {@code char[]} level this saves about 50% memory
82+
* for common encodings and input.
83+
*/
84+
static final class Buffer {
85+
86+
private ByteArrayOutputStream outputStream;
87+
88+
private OutputStreamWriter writer;
89+
90+
Buffer(String encoding) throws UnsupportedEncodingException {
91+
this.outputStream = new ByteArrayOutputStream();
92+
this.writer = new OutputStreamWriter(outputStream, encoding);
93+
}
94+
95+
int length() throws IOException {
96+
this.writer.flush();
97+
return this.outputStream.size();
98+
}
99+
100+
void append(char[] cbuf, int off, int len) throws IOException {
101+
this.writer.write(cbuf, off, len);
102+
}
103+
104+
void append(String str, int off, int len) throws IOException {
105+
this.writer.write(str, off, len);
106+
}
107+
108+
byte[] getBytes() throws IOException {
109+
this.writer.flush();
110+
return this.outputStream.toByteArray();
111+
}
112+
113+
void writeTo(OutputStream stream) throws IOException {
114+
this.writer.flush();
115+
this.outputStream.writeTo(stream);
116+
}
117+
118+
}
74119

75120
/**
76121
* Flag to indicate that changes should be force-synced to disk on flush.
@@ -88,11 +133,15 @@ public void setForceSync(boolean forceSync) {
88133
/**
89134
* @return
90135
*/
91-
private StringBuilder getCurrentBuffer() {
136+
private Buffer getCurrentBuffer() {
92137

93138
if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
94139

95-
TransactionSynchronizationManager.bindResource(bufferKey, new StringBuilder());
140+
try {
141+
TransactionSynchronizationManager.bindResource(bufferKey, new Buffer(this.encoding));
142+
} catch (UnsupportedEncodingException e) {
143+
throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e);
144+
}
96145

97146
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
98147
@Override
@@ -113,18 +162,11 @@ public void beforeCommit(boolean readOnly) {
113162
}
114163

115164
private void complete() throws IOException {
116-
StringBuilder buffer = (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey);
165+
Buffer buffer = (Buffer) TransactionSynchronizationManager.getResource(bufferKey);
117166
if (buffer != null) {
118-
String string = buffer.toString();
119-
byte[] bytes = string.getBytes(encoding);
120-
int bufferLength = bytes.length;
121-
ByteBuffer bb = ByteBuffer.wrap(bytes);
122-
int bytesWritten = channel.write(bb);
123-
if(bytesWritten != bufferLength) {
124-
throw new IOException("All bytes to be written were not successfully written");
125-
}
167+
buffer.writeTo(outputStream);
126168
if (forceSync) {
127-
channel.force(false);
169+
outputStream.getChannel().force(false);
128170
}
129171
if (TransactionSynchronizationManager.hasResource(closeKey)) {
130172
closeCallback.run();
@@ -145,7 +187,7 @@ private void clear() {
145187

146188
}
147189

148-
return (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey);
190+
return (Buffer) TransactionSynchronizationManager.getResource(bufferKey);
149191

150192
}
151193

@@ -160,9 +202,9 @@ public long getBufferSize() {
160202
return 0L;
161203
}
162204
try {
163-
return getCurrentBuffer().toString().getBytes(encoding).length;
164-
} catch (UnsupportedEncodingException e) {
165-
throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e);
205+
return getCurrentBuffer().length();
206+
} catch (IOException e) {
207+
throw new WriteFailedException("Could not determine buffer size because of underlying error", e);
166208
}
167209
}
168210

@@ -197,8 +239,30 @@ public void close() throws IOException {
197239
@Override
198240
public void flush() throws IOException {
199241
if (!transactionActive() && forceSync) {
200-
channel.force(false);
242+
outputStream.getChannel().force(false);
243+
}
244+
}
245+
246+
@Override
247+
public void write(String str, int off, int len) throws IOException {
248+
if (!transactionActive()) {
249+
if (len > 8192) {
250+
// avoid creation of large byte[]
251+
BufferedOutputStream bos = new BufferedOutputStream(outputStream);
252+
OutputStreamWriter writer = new OutputStreamWriter(bos, this.encoding);
253+
writer.write(str, off, len);
254+
writer.flush();
255+
// do not call #close(), do not close underlying stream
256+
}
257+
else {
258+
byte[] bytes = str.substring(off, len).getBytes(encoding);
259+
outputStream.write(bytes);
260+
}
261+
return;
201262
}
263+
264+
Buffer buffer = getCurrentBuffer();
265+
buffer.append(str, off, len);
202266
}
203267

204268
/*
@@ -210,19 +274,22 @@ public void flush() throws IOException {
210274
public void write(char[] cbuf, int off, int len) throws IOException {
211275

212276
if (!transactionActive()) {
213-
char [] subArray = new char[len];
214-
System.arraycopy(cbuf, off, subArray, 0, len);
215-
byte[] bytes = new String(subArray).getBytes(encoding);
216-
int length = bytes.length;
217-
ByteBuffer bb = ByteBuffer.wrap(bytes);
218-
int bytesWritten = channel.write(bb);
219-
if(bytesWritten != length) {
220-
throw new IOException("Unable to write all data. Bytes to write: " + len + ". Bytes written: " + bytesWritten);
277+
if (len > 8192) {
278+
// avoid creation of large byte[]
279+
BufferedOutputStream bos = new BufferedOutputStream(outputStream);
280+
OutputStreamWriter writer = new OutputStreamWriter(bos, this.encoding);
281+
writer.write(cbuf, off, len);
282+
writer.flush();
283+
// do not call #close(), do not close underlying stream
284+
}
285+
else {
286+
byte[] bytes = new String(cbuf, off, len).getBytes(encoding);
287+
outputStream.write(bytes);
221288
}
222289
return;
223290
}
224291

225-
StringBuilder buffer = getCurrentBuffer();
292+
Buffer buffer = getCurrentBuffer();
226293
buffer.append(cbuf, off, len);
227294
}
228295
}

0 commit comments

Comments
 (0)