Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/teradata/tpcds/Scaling.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.teradata.tpcds.distribution.CalendarDistribution;
import com.teradata.tpcds.type.Date;

import java.io.Serializable;
import java.util.EnumMap;
import java.util.Map;

Expand All @@ -39,7 +40,7 @@
import static com.teradata.tpcds.type.Date.fromJulianDays;
import static com.teradata.tpcds.type.Date.isLeapYear;

public class Scaling
public class Scaling implements Serializable
{
private final double scale;
private final Map<Table, Long> tableToRowCountMap = new EnumMap<>(Table.class);
Expand Down
116 changes: 103 additions & 13 deletions src/main/java/com/teradata/tpcds/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package com.teradata.tpcds;

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;

import static com.teradata.tpcds.Options.DEFAULT_DIRECTORY;
Expand All @@ -26,38 +30,48 @@
import static com.teradata.tpcds.Options.DEFAULT_SEPARATOR;
import static com.teradata.tpcds.Options.DEFAULT_SUFFIX;

public class Session
public class Session implements Serializable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements should be on a newline to match the style of the rest of the project. (I thought that would be caught by checkstyle, but I guess not)

{
private final Scaling scaling;
private final String targetDirectory;
private final String suffix;
private final Optional<Table> table;
private final Table table;
private final String nullString;
private final char separator;
private final boolean doNotTerminate;
private final boolean noSexism;
private final int parallelism;
private final int chunkNumber;
private final boolean overwrite;
private final SerializableHadoopConfiguration hadoopConfig;

public Session(double scale, String targetDirectory, String suffix, Optional<Table> table, String nullString, char separator, boolean doNotTerminate, boolean noSexism, int parallelism, boolean overwrite)
{
this(scale, targetDirectory, suffix, table, nullString, separator, doNotTerminate, noSexism, parallelism, 1, overwrite);
}

public Session(double scale, String targetDirectory, String suffix, Optional<Table> table, String nullString, char separator, boolean doNotTerminate, boolean noSexism, int parallelism, int chunkNumber, boolean overwrite)
{
this(scale, targetDirectory, suffix, table, nullString, separator, doNotTerminate, noSexism, parallelism,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the function arguments don't fit nicely on one line, can you put one per line?

this(
    scale,
    targetDirectory,
    ....)

chunkNumber, overwrite, null);
}

public Session(double scale, String targetDirectory, String suffix, Optional<Table> table, String nullString,
char separator, boolean doNotTerminate, boolean noSexism, int parallelism, int chunkNumber,
boolean overwrite, Configuration hadoopConfig)
{
this.scaling = new Scaling(scale);
this.targetDirectory = targetDirectory;
this.suffix = suffix;
this.table = table;
this.table = table.isPresent() ? table.get() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you're here, it would be great to add requireNonNull() checks for the fields that shouldn't be null.

this.nullString = nullString;
this.separator = separator;
this.doNotTerminate = doNotTerminate;
this.noSexism = noSexism;
this.parallelism = parallelism;
this.chunkNumber = chunkNumber;
this.overwrite = overwrite;
this.hadoopConfig = new SerializableHadoopConfiguration(hadoopConfig);
}

public static Session getDefaultSession()
Expand All @@ -71,7 +85,7 @@ public Session withTable(Table table)
this.scaling.getScale(),
this.targetDirectory,
this.suffix,
Optional.of(table),
Optional.ofNullable(table),
this.nullString,
this.separator,
this.doNotTerminate,
Expand All @@ -88,7 +102,7 @@ public Session withScale(double scale)
scale,
this.targetDirectory,
this.suffix,
this.table,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
Expand All @@ -105,7 +119,7 @@ public Session withParallelism(int parallelism)
this.scaling.getScale(),
this.targetDirectory,
this.suffix,
this.table,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
Expand All @@ -122,7 +136,7 @@ public Session withChunkNumber(int chunkNumber)
this.scaling.getScale(),
this.targetDirectory,
this.suffix,
this.table,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
Expand All @@ -139,7 +153,7 @@ public Session withNoSexism(boolean noSexism)
this.scaling.getScale(),
this.targetDirectory,
this.suffix,
this.table,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
Expand All @@ -150,6 +164,42 @@ public Session withNoSexism(boolean noSexism)
);
}

public Session withHadoop(Configuration hadoopConfig)
{
return new Session(
this.scaling.getScale(),
this.targetDirectory,
this.suffix,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
noSexism,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.noSexism to match the rest of the arguments

this.parallelism,
this.chunkNumber,
this.overwrite,
hadoopConfig
);
}

public Session withTargetDirectory(String targetDirectory)
{
return new Session(
this.scaling.getScale(),
targetDirectory,
this.suffix,
Optional.ofNullable(this.table),
this.nullString,
this.separator,
this.doNotTerminate,
this.noSexism,
this.parallelism,
this.chunkNumber,
this.overwrite,
this.hadoopConfig.get()
);
}

public Scaling getScaling()
{
return scaling;
Expand All @@ -167,15 +217,15 @@ public String getSuffix()

public boolean generateOnlyOneTable()
{
return table.isPresent();
return Optional.ofNullable(table).isPresent();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to convert to an optional here now, just check if it's not null

}

public Table getOnlyTableToGenerate()
{
if (!table.isPresent()) {
if (!Optional.ofNullable(table).isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. Just check for null.

throw new TpcdsException("table not present");
}
return table.get();
return Optional.ofNullable(table).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

}

public String getNullString()
Expand Down Expand Up @@ -213,6 +263,11 @@ public boolean shouldOverwrite()
return overwrite;
}

public Configuration getHadoopConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have this return an optional

{
return this.hadoopConfig.get();
}

public String getCommandLineArguments()
{
StringBuilder output = new StringBuilder();
Expand All @@ -225,8 +280,8 @@ public String getCommandLineArguments()
if (!suffix.equals(DEFAULT_SUFFIX)) {
output.append("--suffix ").append(suffix).append(" ");
}
if (table.isPresent()) {
output.append("--table ").append(table.get().getName()).append(" ");
if (Optional.ofNullable(table).isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just check for null

output.append("--table ").append(Optional.ofNullable(table).get().getName()).append(" ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just do table.getName()

}
if (!nullString.equals(DEFAULT_NULL_STRING)) {
output.append("--null ").append(nullString).append(" ");
Expand Down Expand Up @@ -255,3 +310,38 @@ public String getCommandLineArguments()
return output.toString();
}
}

class SerializableHadoopConfiguration implements Serializable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements on a new line

{
Configuration conf;

public SerializableHadoopConfiguration(Configuration hadoopConf)
{
this.conf = hadoopConf;

if (this.conf == null) {
this.conf = new Configuration();
}
}

public SerializableHadoopConfiguration()
{
this.conf = new Configuration();
}

public Configuration get()
{
return this.conf;
}

private void writeObject(java.io.ObjectOutputStream out) throws IOException
{
this.conf.write(out);
}

private void readObject(java.io.ObjectInputStream in) throws IOException
{
this.conf = new Configuration();
this.conf.readFields(in);
}
}
43 changes: 33 additions & 10 deletions src/main/java/com/teradata/tpcds/TableGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@

package com.teradata.tpcds;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,19 +70,37 @@ private OutputStreamWriter addFileWriterForTable(Table table)
throws IOException
{
String path = getPath(table);
File file = new File(path);
boolean newFileCreated = file.createNewFile();
if (!newFileCreated) {
if (session.shouldOverwrite()) {
// truncate the file
new FileOutputStream(path).close();
}
else {
throw new TpcdsException(format("File %s exists. Remove it or run with the '--overwrite' option", path));
if (session.getHadoopConfig() == null) {
File file = new File(path);
boolean newFileCreated = file.createNewFile();
if (!newFileCreated) {
if (session.shouldOverwrite()) {
// truncate the file
new FileOutputStream(path).close();
}
else {
throw new TpcdsException(format("File %s exists. Remove it or run with the '--overwrite' option", path));
}
}
return new OutputStreamWriter(new FileOutputStream(path, true), StandardCharsets.ISO_8859_1);
}
try {
FileSystem fs = FileSystem.get(new URI(session.getTargetDirectory()), session.getHadoopConfig());

return new OutputStreamWriter(new FileOutputStream(path, true), StandardCharsets.ISO_8859_1);
Path hadoopPath = new Path(path);
if (fs.exists(hadoopPath)) {
if (session.shouldOverwrite()) {
fs.delete(hadoopPath, true);
}
else {
throw new TpcdsException(format("File %s exists. Remove it or run with the '--overwrite' option", path));
}
}
return new OutputStreamWriter(fs.create(hadoopPath, true, 4096), StandardCharsets.ISO_8859_1);
}
catch (final URISyntaxException uriEx) {
throw new TpcdsException("Could not create URI for connecting to Hadoop:" + uriEx.getMessage());
}
}

private String getPath(Table table)
Expand Down