1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17- package org .apache .spark .deploy .kubernetes . submit . v1
17+ package org .apache .spark .deploy .kubernetes
1818
19- import java .io .{ByteArrayInputStream , File , FileInputStream , FileOutputStream }
19+ import java .io .{ByteArrayInputStream , File , FileInputStream , FileOutputStream , InputStream , OutputStream }
2020import java .util .zip .{GZIPInputStream , GZIPOutputStream }
2121
2222import com .google .common .io .Files
@@ -48,40 +48,7 @@ private[spark] object CompressionUtils extends Logging {
4848 */
4949 def createTarGzip (paths : Iterable [String ]): TarGzippedData = {
5050 val compressedBytesStream = Utils .tryWithResource(new ByteBufferOutputStream ()) { raw =>
51- Utils .tryWithResource(new GZIPOutputStream (raw)) { gzipping =>
52- Utils .tryWithResource(new TarArchiveOutputStream (
53- gzipping,
54- BLOCK_SIZE ,
55- RECORD_SIZE ,
56- ENCODING )) { tarStream =>
57- val usedFileNames = mutable.HashSet .empty[String ]
58- for (path <- paths) {
59- val file = new File (path)
60- if (! file.isFile) {
61- throw new IllegalArgumentException (s " Cannot add $path to tarball; either does " +
62- s " not exist or is a directory. " )
63- }
64- var resolvedFileName = file.getName
65- val extension = Files .getFileExtension(file.getName)
66- val nameWithoutExtension = Files .getNameWithoutExtension(file.getName)
67- var deduplicationCounter = 1
68- while (usedFileNames.contains(resolvedFileName)) {
69- val oldResolvedFileName = resolvedFileName
70- resolvedFileName = s " $nameWithoutExtension- $deduplicationCounter. $extension"
71- logWarning(s " File with name $oldResolvedFileName already exists. Trying to add " +
72- s " with file name $resolvedFileName instead. " )
73- deduplicationCounter += 1
74- }
75- usedFileNames += resolvedFileName
76- val tarEntry = new TarArchiveEntry (file, resolvedFileName)
77- tarStream.putArchiveEntry(tarEntry)
78- Utils .tryWithResource(new FileInputStream (file)) { fileInput =>
79- IOUtils .copy(fileInput, tarStream)
80- }
81- tarStream.closeArchiveEntry()
82- }
83- }
84- }
51+ writeTarGzipToStream(raw, paths)
8552 raw
8653 }
8754 val compressedAsBase64 = Base64 .encodeBase64String(compressedBytesStream.toByteBuffer.array)
@@ -93,6 +60,44 @@ private[spark] object CompressionUtils extends Logging {
9360 )
9461 }
9562
63+ def writeTarGzipToStream (outputStream : OutputStream , paths : Iterable [String ]): Unit = {
64+ Utils .tryWithResource(new GZIPOutputStream (outputStream)) { gzipping =>
65+ Utils .tryWithResource(new TarArchiveOutputStream (
66+ gzipping,
67+ BLOCK_SIZE ,
68+ RECORD_SIZE ,
69+ ENCODING )) { tarStream =>
70+ val usedFileNames = mutable.HashSet .empty[String ]
71+ for (path <- paths) {
72+ val file = new File (path)
73+ if (! file.isFile) {
74+ throw new IllegalArgumentException (s " Cannot add $path to tarball; either does " +
75+ s " not exist or is a directory. " )
76+ }
77+ var resolvedFileName = file.getName
78+ val extension = Files .getFileExtension(file.getName)
79+ val nameWithoutExtension = Files .getNameWithoutExtension(file.getName)
80+ var deduplicationCounter = 1
81+ while (usedFileNames.contains(resolvedFileName)) {
82+ val oldResolvedFileName = resolvedFileName
83+ resolvedFileName = s " $nameWithoutExtension- $deduplicationCounter. $extension"
84+ logWarning(s " File with name $oldResolvedFileName already exists. Trying to add " +
85+ s " with file name $resolvedFileName instead. " )
86+ deduplicationCounter += 1
87+ }
88+ usedFileNames += resolvedFileName
89+ val tarEntry = new TarArchiveEntry (resolvedFileName)
90+ tarEntry.setSize(file.length());
91+ tarStream.putArchiveEntry(tarEntry)
92+ Utils .tryWithResource(new FileInputStream (file)) { fileInput =>
93+ IOUtils .copy(fileInput, tarStream)
94+ }
95+ tarStream.closeArchiveEntry()
96+ }
97+ }
98+ }
99+ }
100+
96101 /**
97102 * Decompresses the provided tar archive to a directory.
98103 * @param compressedData In-memory representation of the compressed data, ideally created via
@@ -104,7 +109,6 @@ private[spark] object CompressionUtils extends Logging {
104109 def unpackAndWriteCompressedFiles (
105110 compressedData : TarGzippedData ,
106111 rootOutputDir : File ): Seq [String ] = {
107- val paths = mutable.Buffer .empty[String ]
108112 val compressedBytes = Base64 .decodeBase64(compressedData.dataBase64)
109113 if (! rootOutputDir.exists) {
110114 if (! rootOutputDir.mkdirs) {
@@ -116,24 +120,39 @@ private[spark] object CompressionUtils extends Logging {
116120 s " ${rootOutputDir.getAbsolutePath} exists and is not a directory. " )
117121 }
118122 Utils .tryWithResource(new ByteArrayInputStream (compressedBytes)) { compressedBytesStream =>
119- Utils .tryWithResource(new GZIPInputStream (compressedBytesStream)) { gzipped =>
120- Utils .tryWithResource(new TarArchiveInputStream (
121- gzipped,
122- compressedData.blockSize,
123- compressedData.recordSize,
124- compressedData.encoding)) { tarInputStream =>
125- var nextTarEntry = tarInputStream.getNextTarEntry
126- while (nextTarEntry != null ) {
127- val outputFile = new File (rootOutputDir, nextTarEntry.getName)
128- Utils .tryWithResource(new FileOutputStream (outputFile)) { fileOutputStream =>
129- IOUtils .copy(tarInputStream, fileOutputStream)
130- }
131- paths += outputFile.getAbsolutePath
132- nextTarEntry = tarInputStream.getNextTarEntry
123+ unpackTarStreamToDirectory(
124+ compressedBytesStream,
125+ rootOutputDir,
126+ compressedData.blockSize,
127+ compressedData.recordSize,
128+ compressedData.encoding)
129+ }
130+ }
131+
132+ def unpackTarStreamToDirectory (
133+ inputStream : InputStream ,
134+ outputDir : File ,
135+ blockSize : Int = BLOCK_SIZE ,
136+ recordSize : Int = RECORD_SIZE ,
137+ encoding : String = ENCODING ): Seq [String ] = {
138+ val paths = mutable.Buffer .empty[String ]
139+ Utils .tryWithResource(new GZIPInputStream (inputStream)) { gzipped =>
140+ Utils .tryWithResource(new TarArchiveInputStream (
141+ gzipped,
142+ blockSize,
143+ recordSize,
144+ encoding)) { tarInputStream =>
145+ var nextTarEntry = tarInputStream.getNextTarEntry
146+ while (nextTarEntry != null ) {
147+ val outputFile = new File (outputDir, nextTarEntry.getName)
148+ Utils .tryWithResource(new FileOutputStream (outputFile)) { fileOutputStream =>
149+ IOUtils .copy(tarInputStream, fileOutputStream)
133150 }
151+ paths += outputFile.getAbsolutePath
152+ nextTarEntry = tarInputStream.getNextTarEntry
134153 }
135154 }
136155 }
137- paths.toSeq
156+ paths
138157 }
139158}
0 commit comments