From bccf2d8a20ec920f1a8451637e34318776737d7a Mon Sep 17 00:00:00 2001 From: dan mcweeney Date: Thu, 3 Aug 2017 16:33:14 -0400 Subject: [PATCH] Fixes #24 - Teach Table Generator how to write to a HDFS file. * Make session & scaling serializable so it can be passed around * Teach session factory pattern to take in a hadoop config object * Make a serializable hadoop config object to ensure Session is serializable --- pom.xml | 13 ++ src/main/java/com/teradata/tpcds/Scaling.java | 3 +- src/main/java/com/teradata/tpcds/Session.java | 116 ++++++++++++++++-- .../com/teradata/tpcds/TableGenerator.java | 43 +++++-- 4 files changed, 151 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index 894e8ab..24add69 100644 --- a/pom.xml +++ b/pom.xml @@ -237,6 +237,19 @@ javax.inject javax.inject + + org.apache.hadoop + hadoop-common + 2.7.1 + provided + true + + + com.google.code.findbugs + jsr305 + + + diff --git a/src/main/java/com/teradata/tpcds/Scaling.java b/src/main/java/com/teradata/tpcds/Scaling.java index 57933ab..ac45542 100644 --- a/src/main/java/com/teradata/tpcds/Scaling.java +++ b/src/main/java/com/teradata/tpcds/Scaling.java @@ -17,6 +17,7 @@ import com.teradata.tpcds.distribution.CalendarDistribution; import com.teradata.tpcds.type.Date; +import java.io.Serializable; import java.util.EnumMap; import java.util.Map; @@ -39,7 +40,7 @@ import static com.teradata.tpcds.type.Date.fromJulianDays; import static com.teradata.tpcds.type.Date.isLeapYear; -public class Scaling +public class Scaling implements Serializable { private final double scale; private final Map tableToRowCountMap = new EnumMap<>(Table.class); diff --git a/src/main/java/com/teradata/tpcds/Session.java b/src/main/java/com/teradata/tpcds/Session.java index a1477dd..5bbbc9b 100644 --- a/src/main/java/com/teradata/tpcds/Session.java +++ b/src/main/java/com/teradata/tpcds/Session.java @@ -14,6 +14,10 @@ package com.teradata.tpcds; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.Serializable; import java.util.Optional; import static com.teradata.tpcds.Options.DEFAULT_DIRECTORY; @@ -26,12 +30,12 @@ import static com.teradata.tpcds.Options.DEFAULT_SEPARATOR; import static com.teradata.tpcds.Options.DEFAULT_SUFFIX; -public class Session +public class Session implements Serializable { private final Scaling scaling; private final String targetDirectory; private final String suffix; - private final Optional table; + private final Table table; private final String nullString; private final char separator; private final boolean doNotTerminate; @@ -39,6 +43,7 @@ public class Session private final int parallelism; private final int chunkNumber; private final boolean overwrite; + private final SerializableHadoopConfiguration hadoopConfig; public Session(double scale, String targetDirectory, String suffix, Optional
table, String nullString, char separator, boolean doNotTerminate, boolean noSexism, int parallelism, boolean overwrite) { @@ -46,11 +51,19 @@ public Session(double scale, String targetDirectory, String suffix, Optional table, String nullString, char separator, boolean doNotTerminate, boolean noSexism, int parallelism, int chunkNumber, boolean overwrite) + { + this(scale, targetDirectory, suffix, table, nullString, separator, doNotTerminate, noSexism, parallelism, + chunkNumber, overwrite, null); + } + + public Session(double scale, String targetDirectory, String suffix, Optional
table, String nullString, + char separator, boolean doNotTerminate, boolean noSexism, int parallelism, int chunkNumber, + boolean overwrite, Configuration hadoopConfig) { this.scaling = new Scaling(scale); this.targetDirectory = targetDirectory; this.suffix = suffix; - this.table = table; + this.table = table.isPresent() ? table.get() : null; this.nullString = nullString; this.separator = separator; this.doNotTerminate = doNotTerminate; @@ -58,6 +71,7 @@ public Session(double scale, String targetDirectory, String suffix, Optional