diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
new file mode 100644
index 0000000..43a1ea7
--- /dev/null
+++ b/.github/workflows/maven.yml
@@ -0,0 +1,17 @@
+name: Java CI
+
+on: [push]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v1
+ - name: Set up JDK 1.8
+ uses: actions/setup-java@v1
+ with:
+ java-version: 1.8
+ - name: Build with Maven
+ run: mvn -B package --file pom.xml
\ No newline at end of file
diff --git a/README.md b/README.md
index 1f41fb4..2730aa6 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# bireme
+# bireme-- (based on https://github.com/HashDataInc/bireme ,enable to stand by DDL operation)
[](https://travis-ci.org/HashDataInc/bireme)
diff --git a/README_zh-cn.md b/README_zh-cn.md
index c4ef3e9..47d3816 100644
--- a/README_zh-cn.md
+++ b/README_zh-cn.md
@@ -1,4 +1,4 @@
-# bireme
+# bireme -- (基于 https://github.com/HashDataInc/bireme 改造,可支持DDL操作)
[](https://travis-ci.org/HashDataInc/bireme)
diff --git a/etc/config.properties b/etc/config.properties
index 4ce3633..718d9de 100644
--- a/etc/config.properties
+++ b/etc/config.properties
@@ -1,26 +1,24 @@
# target database where the data will sync into.
-target.url = jdbc:postgresql://127.0.0.1:5432/postgres
-target.user = postgres
-target.passwd = postgres
-
-# data source name list, separated by comma.
-data_source = maxwell1, debezium1
-
-# data source "mysql1" type
-maxwell1.type = maxwell
+target.url = jdbc:postgresql://172.16.101.93:5432/songyc5_test?reWriteBatchedInserts=true
+target.user = gpadmin
+target.passwd = gpadmin
+#target.url = jdbc:postgresql://172.16.101.19:5432/postgres
+#target.user = gpadmin
+#target.passwd = gpadmin
+
+data_source = mysql_ddl
+#mysql_dml.type = maxwell
+mysql_ddl.type = maxwell
# kafka server which maxwell write binlog into.
-maxwell1.kafka.server = 127.0.0.1:9092
+#mysql_dml.kafka.server = localhost:9092
+mysql_ddl.kafka.server = localhost:9092
# kafka topic which maxwell write binlog into.
-maxwell1.kafka.topic = topic_name1
-# kafka groupid used for consumer.
-maxwell1.kafka.groupid = bireme
-
-# data source "debezium1"
-debezium1.type = debezium
-# kafka server which debezium write into.
-debezium1.kafka.server = 127.0.0.1:9092
+#mysql_dml.kafka.topic=namespace_%{database}_%{table}
+#mysql_dml.kafka.topic=maxwell_dml
+mysql_ddl.kafka.topic=maxwell_ddl
# kafka groupid used for consumer.
-debezium1.kafka.groupid = bireme
+#mysql_dml.kafka.groupid=bireme
+#mysql_ddl.kafka.groupid=bireme
# number of threads used for pipeline to drive the porcess
pipeline.thread_pool.size = 5
diff --git a/etc/debezium1.properties b/etc/debezium1.properties
deleted file mode 100644
index 6a86110..0000000
--- a/etc/debezium1.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-# source table full name = target table full name
-public.source = public.target
\ No newline at end of file
diff --git a/etc/maxwell1.properties b/etc/maxwell1.properties
deleted file mode 100644
index 63256f5..0000000
--- a/etc/maxwell1.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-# source table full name = target table full name
-demo.table1 = public.table1
-#demo.table2 = public.table2
diff --git a/etc/mysql_dml.properties b/etc/mysql_dml.properties
new file mode 100644
index 0000000..472b6c6
--- /dev/null
+++ b/etc/mysql_dml.properties
@@ -0,0 +1 @@
+demo.test1 = public.test1
diff --git a/pom.xml b/pom.xml
index 802f562..48d6e17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,11 @@
4.12
test
+
+ org.projectlombok
+ lombok
+ 1.16.20
+
org.apache.kafka
kafka-clients
@@ -142,7 +147,8 @@
-
+
+
org.apache.maven.plugins
maven-javadoc-plugin
@@ -197,7 +203,7 @@
-
org.eclipse.m2e
@@ -232,29 +238,4 @@
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-report-plugin
- 2.17
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- 2.12.1
-
- ${basedir}/src/main/resources/checkstyle.xml
-
-
-
- org.codehaus.mojo
- findbugs-maven-plugin
- 3.0.4
-
-
-
diff --git a/src/main/java/cn/hashdata/bireme/Bireme.java b/src/main/java/cn/hashdata/bireme/Bireme.java
index 1268cf7..cc130a0 100644
--- a/src/main/java/cn/hashdata/bireme/Bireme.java
+++ b/src/main/java/cn/hashdata/bireme/Bireme.java
@@ -14,6 +14,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -50,317 +51,353 @@
* / Hashdata database.
*
* @author yuze
- *
*/
public class Bireme implements Daemon {
- static final protected Long TIMEOUT_MS = 1000L;
- private static final String DEFAULT_CONFIG_FILE = "etc/config.properties";
- private DaemonContext context;
- private Context cxt;
-
- private Logger logger = LogManager.getLogger("Bireme");
-
- private ConsoleReporter consoleReporter;
- private JmxReporter jmxReporter;
-
- protected void parseCommandLine(String[] args)
- throws DaemonInitException, ConfigurationException, BiremeException {
- Option help = new Option("help", "print this message");
- Option configFile =
- Option.builder("config_file").hasArg().argName("file").desc("config file location").build();
-
- Options opts = new Options();
- opts.addOption(help);
- opts.addOption(configFile);
- CommandLine cmd = null;
- CommandLineParser parser = new DefaultParser();
-
- try {
- cmd = parser.parse(opts, args);
-
- if (cmd.hasOption("help")) {
- throw new ParseException("print help message");
- }
- } catch (ParseException e) {
- HelpFormatter formatter = new HelpFormatter();
- StringWriter out = new StringWriter();
- PrintWriter writer = new PrintWriter(out);
- formatter.printHelp(writer, formatter.getWidth(), "Bireme", null, opts,
- formatter.getLeftPadding(), formatter.getDescPadding(), null, true);
- writer.flush();
- String result = out.toString();
- throw new DaemonInitException(result);
- }
-
- String config = cmd.getOptionValue("config_file", DEFAULT_CONFIG_FILE);
-
- cxt = new Context(new Config(config));
- }
-
- /**
- * Get metadata about the table from target database.
- *
- * @throws BiremeException when fail to connect or get the metadata
- */
- protected void getTableInfo() throws BiremeException {
- logger.info("Start getting metadata of target tables from target database.");
-
- Map> tableInfoMap = null;
+ static final protected Long TIMEOUT_MS = 1000L;
+ private static final String DEFAULT_CONFIG_FILE = "etc/config.properties";
+ private DaemonContext context;
+ private Context cxt;
+
+ private Logger logger = LogManager.getLogger("Bireme");
+
+ private ConsoleReporter consoleReporter;
+ private JmxReporter jmxReporter;
+
+ private boolean isDDL = false;
+
+ /**
+ * 解析命令行参数
+ * @param args 命令行参数
+ * @throws DaemonInitException Daemon初始化异常
+ * @throws ConfigurationException 配置异常
+ * @throws BiremeException 自定义异常
+ */
+ protected void parseCommandLine(String[] args)
+ throws DaemonInitException, ConfigurationException, BiremeException {
+ Option help = new Option("help", "print this message");
+ Option configFile =
+ Option.builder("config_file").hasArg().argName("file").desc("config file location").build();
+
+ Options opts = new Options();
+ opts.addOption(help);
+ opts.addOption(configFile);
+ CommandLine cmd = null;
+ CommandLineParser parser = new DefaultParser();
- String[] strArray;
- Connection conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
-
- try {
- tableInfoMap = GetPrimaryKeys.getPrimaryKeys(cxt.tableMap, conn);
- } catch (Exception e) {
- String message = "error occurs in this way! ";
- throw new BiremeException(message, e);
- }
-
- for (String fullname : cxt.tableMap.values()) {
- if (cxt.tablesInfo.containsKey(fullname)) {
- continue;
- }
+ try {
+ cmd = parser.parse(opts, args);
+
+ if (cmd.hasOption("help")) {
+ throw new ParseException("print help message");
+ }
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ StringWriter out = new StringWriter();
+ PrintWriter writer = new PrintWriter(out);
+ formatter.printHelp(writer, formatter.getWidth(), "Bireme", null, opts,
+ formatter.getLeftPadding(), formatter.getDescPadding(), null, true);
+ writer.flush();
+ String result = out.toString();
+ throw new DaemonInitException(result);
+ }
- strArray = fullname.split("\\.");
- cxt.tablesInfo.put(fullname, new Table(strArray[1], tableInfoMap, conn));
- }
+ String config = cmd.getOptionValue("config_file", DEFAULT_CONFIG_FILE);
- try {
- conn.close();
- } catch (SQLException ignore) {
+ cxt = new Context(new Config(config));
}
- logger.info("Finish getting metadata of target tables from target database.");
- }
+ /**
+ * Get metadata about the table from target database.
+ *
+ * @throws BiremeException when fail to connect or get the metadata
+ */
+ protected void getTableInfo() throws BiremeException {
+ logger.info("Start getting metadata of target tables from target database.");
- /**
- * Establish connections to the target database.
- *
- * @throws BiremeException fail to connect
- */
- protected void initLoaderConnections() throws BiremeException {
- logger.info("Start establishing connections for loaders.");
+ Map> tableInfoMap = null;
- LinkedBlockingQueue conns = cxt.loaderConnections;
- HashMap> temporatyTables = cxt.temporaryTables;
- Connection conn = null;
+ String[] strArray;
+ Connection conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
- try {
- for (int i = 0, number = cxt.conf.loader_conn_size; i < number; i++) {
- conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
- conn.setAutoCommit(true);
- Statement stmt = conn.createStatement();
+ try {
+ tableInfoMap = GetPrimaryKeys.getPrimaryKeys(cxt.tableMap, conn);
+ } catch (Exception e) {
+ String message = "error occurs in this way! ";
+ throw new BiremeException(message, e);
+ }
- stmt.execute("set enable_nestloop = on;");
- stmt.execute("set enable_seqscan = off;");
- stmt.execute("set enable_hashjoin = off;");
+ for (String fullname : cxt.tableMap.values()) {
+ if (cxt.tablesInfo.containsKey(fullname)) {
+ continue;
+ }
- try {
- stmt.execute("set gp_autostats_mode = none;");
- } catch (SQLException ignore) {
+ strArray = fullname.split("\\.");
+ cxt.tablesInfo.put(fullname, new Table(strArray[1], tableInfoMap, conn));
}
- conn.setAutoCommit(false);
- conns.add(conn);
- temporatyTables.put(conn, new HashSet());
- }
- } catch (SQLException e) {
- for (Connection closeConn : temporatyTables.keySet()) {
try {
- closeConn.close();
+ conn.close();
} catch (SQLException ignore) {
}
- }
- throw new BiremeException("Could not establish connection to target database.", e);
+ logger.info("Finish getting metadata of target tables from target database.");
}
- logger.info("Finishing establishing {} connections for loaders.", cxt.conf.loader_conn_size);
- }
+ /**
+ * Establish connections to the target database.
+ *
+ * @throws BiremeException fail to connect
+ */
+ protected void initLoaderConnections() throws BiremeException {
+ logger.info("Start establishing connections for loaders.");
+
+ LinkedBlockingQueue conns = cxt.loaderConnections;
+ HashMap> temporatyTables = cxt.temporaryTables;
+ Connection conn = null;
+
+ try {
+ for (int i = 0, number = cxt.conf.loader_conn_size; i < number; i++) {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ conn.setAutoCommit(true);
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("set enable_nestloop = on;");
+ stmt.execute("set enable_seqscan = off;");
+ stmt.execute("set enable_hashjoin = off;");
+
+ try {
+ stmt.execute("set gp_autostats_mode = none;");
+ } catch (SQLException ignore) {
+ }
+
+ conn.setAutoCommit(false);
+ conns.add(conn);
+ if (this.isDDL == true){
+ continue;
+ }
+ temporatyTables.put(conn, new HashSet());
+ }
+ } catch (SQLException e) {
+ for (Connection closeConn : temporatyTables.keySet()) {
+ try {
+ closeConn.close();
+ } catch (SQLException ignore) {
+ }
+ }
+
+ throw new BiremeException("Could not establish connection to target database.", e);
+ }
- protected void closeConnections() throws SQLException {
- for (Connection conn : cxt.loaderConnections) {
- conn.close();
+ logger.info("Finishing establishing {} connections for loaders.", cxt.conf.loader_conn_size);
}
- }
-
- protected void createPipeLine() throws BiremeException {
- for (SourceConfig conf : cxt.conf.sourceConfig.values()) {
- switch (conf.type) {
- case MAXWELL:
- KafkaConsumer consumer =
- KafkaPipeLine.createConsumer(conf.server, conf.groupID);
- Iterator iter = consumer.partitionsFor(conf.topic).iterator();
-
- int num = 0;
- while (iter.hasNext()) {
- iter.next();
- num++;
- PipeLine pipeLine = new MaxwellPipeLine(cxt, conf, num);
- cxt.pipeLines.add(pipeLine);
- conf.pipeLines.add(pipeLine);
- }
- break;
-
- case DEBEZIUM:
- for (String sourceTable : conf.tableMap.keySet()) {
- String topic = conf.topic + sourceTable.substring(sourceTable.indexOf("."));
- PipeLine pipeLine = new DebeziumPipeLine(cxt, conf, topic);
- cxt.pipeLines.add(pipeLine);
- conf.pipeLines.add(pipeLine);
- }
- break;
-
- default:
- break;
- }
+
+ protected void closeConnections() throws SQLException {
+ for (Connection conn : cxt.loaderConnections) {
+ conn.close();
+ }
}
- // pipeline state statistics
- Gauge allCount = new Gauge() {
- @Override
- public Integer getValue() {
- return cxt.pipeLines.size();
- }
- };
-
- Gauge liveCount = new Gauge() {
- @Override
- public Integer getValue() {
- int live = 0;
- for (PipeLine pipeline : cxt.pipeLines) {
- if (pipeline.state == PipeLineState.NORMAL) {
- live++;
- }
+ /**
+ * 创建pipeline
+ * @throws BiremeException 自定义异常
+ */
+ protected void createPipeLine() throws BiremeException {
+ for (SourceConfig conf : cxt.conf.sourceConfig.values()) {
+ switch (conf.type) {
+ case MAXWELL:
+ KafkaConsumer consumer =
+ KafkaPipeLine.createConsumer(conf.server, conf.groupID);
+
+ Set topicSet = new HashSet();
+ /**
+ * 替换库表占位符
+ */
+ if (conf.topic.contains("namespace")) {
+ String topic = conf.topic;
+ for (String key : conf.tableMap.keySet()) {
+ String[] s = key.split("\\.");
+ String currTopic = topic.replace("%{database}", s[1]).replace("%{table}", s[2]);
+ topicSet.add(currTopic);
+ }
+ } else {
+ if (conf.topic.contains("ddl")) {
+ this.isDDL = true;
+ }
+ topicSet.add(conf.topic);
+ }
+ for (String curr : topicSet) {
+ conf.topic = curr;
+ Iterator iter = consumer.partitionsFor(curr).iterator();
+ int num = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ num++;
+ PipeLine pipeLine = new MaxwellPipeLine(cxt, conf, num);
+ cxt.pipeLines.add(pipeLine);
+ conf.pipeLines.add(pipeLine);
+ }
+ }
+ break;
+
+ case DEBEZIUM:
+ for (String sourceTable : conf.tableMap.keySet()) {
+ String topic = conf.topic + sourceTable.substring(sourceTable.indexOf("."));
+ PipeLine pipeLine = new DebeziumPipeLine(cxt, conf, topic);
+ cxt.pipeLines.add(pipeLine);
+ conf.pipeLines.add(pipeLine);
+ }
+ break;
+
+ default:
+ break;
+ }
}
- return live;
- }
- };
-
- cxt.register.register(MetricRegistry.name("All Pipeline Number"), allCount);
- cxt.register.register(MetricRegistry.name("All Live Pipeline Number"), liveCount);
- }
-
- /**
- * Start metrics reporter.
- *
- */
- protected void startReporter() {
- switch (cxt.conf.reporter) {
- case "console":
- consoleReporter = ConsoleReporter.forRegistry(cxt.register)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
- consoleReporter.start(cxt.conf.report_interval, TimeUnit.SECONDS);
- break;
- case "jmx":
- jmxReporter = JmxReporter.forRegistry(cxt.register).build();
- jmxReporter.start();
- break;
- default:
- break;
+
+ // pipeline state statistics
+ Gauge allCount = new Gauge() {
+ @Override
+ public Integer getValue() {
+ return cxt.pipeLines.size();
+ }
+ };
+
+ Gauge liveCount = new Gauge() {
+ @Override
+ public Integer getValue() {
+ int live = 0;
+ for (PipeLine pipeline : cxt.pipeLines) {
+ if (pipeline.state == PipeLineState.NORMAL) {
+ live++;
+ }
+ }
+ return live;
+ }
+ };
+
+ cxt.register.register(MetricRegistry.name("All Pipeline Number"), allCount);
+ cxt.register.register(MetricRegistry.name("All Live Pipeline Number"), liveCount);
}
- }
-
- @Override
- public void init(DaemonContext context) throws Exception {
- logger.info("initialize Bireme daemon");
- this.context = context;
- try {
- parseCommandLine(context.getArguments());
- } catch (Exception e) {
- logger.fatal("start failed. Message: {}.", e.getMessage());
- logger.fatal("Stack Trace: ", e);
+
+ /**
+ * Start metrics reporter.
+ */
+ protected void startReporter() {
+ switch (cxt.conf.reporter) {
+ case "console":
+ consoleReporter = ConsoleReporter.forRegistry(cxt.register)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(cxt.conf.report_interval, TimeUnit.SECONDS);
+ break;
+ case "jmx":
+ jmxReporter = JmxReporter.forRegistry(cxt.register).build();
+ jmxReporter.start();
+ break;
+ default:
+ break;
+ }
}
- }
-
- @Override
- public void start() throws BiremeException {
- logger.info("start Bireme daemon.");
- try {
- getTableInfo();
- initLoaderConnections();
- } catch (BiremeException e) {
- logger.fatal("start failed. Message: {}.", e.getMessage());
- logger.fatal("Stack Trace: ", e);
- throw e;
+
+ @Override
+ public void init(DaemonContext context) throws Exception {
+ logger.info("initialize Bireme daemon");
+ this.context = context;
+ try {
+ parseCommandLine(context.getArguments());
+ } catch (Exception e) {
+ logger.fatal("start failed. Message: {}.", e.getMessage());
+ logger.fatal("Stack Trace: ", e);
+ }
}
- createPipeLine();
- cxt.startScheduler();
- startReporter();
+ @Override
+ public void start() throws BiremeException {
+ logger.info("start Bireme daemon.");
+ createPipeLine();
+ try {
+ if (!isDDL){
+ getTableInfo();
+ //initLoaderConnections();
+ }
+ initLoaderConnections();
+ } catch (BiremeException e) {
+ logger.fatal("start failed. Message: {}.", e.getMessage());
+ logger.fatal("Stack Trace: ", e);
+ throw e;
+ }
+ cxt.startScheduler();
+ startReporter();
- if (context != null) {
- cxt.startWatchDog(context.getController());
+ if (context != null) {
+ cxt.startWatchDog(context.getController());
+ }
}
- }
- @Override
- public void stop() {
- logger.info("stop Bireme daemon");
+ @Override
+ public void stop() {
+ logger.info("stop Bireme daemon");
- if (cxt == null) {
- return;
- }
+ if (cxt == null) {
+ return;
+ }
- cxt.stop = true;
- logger.info("set stop flag to true");
+ cxt.stop = true;
+ logger.info("set stop flag to true");
- cxt.waitForExit();
+ cxt.waitForExit();
- try {
- closeConnections();
- } catch (SQLException e) {
- logger.warn(e.getMessage());
- }
+ try {
+ closeConnections();
+ } catch (SQLException e) {
+ logger.warn(e.getMessage());
+ }
- logger.info("Bireme exit");
- }
-
- @Override
- public void destroy() {
- logger.info("destroy Bireme daemon");
- }
-
- /**
- * An entry to start {@code Bireme}.
- *
- * @param args arguments from command line.
- */
- public void entry(String[] args) {
- try {
- parseCommandLine(args);
- } catch (Exception e) {
- logger.fatal("Init failed: {}.", e.getMessage());
- logger.fatal("Stack Trace: ", e);
- System.err.println(e.getMessage());
- e.printStackTrace();
- System.exit(1);
+ logger.info("Bireme exit");
}
- try {
- start();
- cxt.waitForStop();
- } catch (Exception e) {
- logger.fatal("Bireme stop abnormally since: {}", e.getMessage());
- logger.fatal("Stack Trace: ", e);
+ @Override
+ public void destroy() {
+ logger.info("destroy Bireme daemon");
}
- cxt.waitForExit();
+ /**
+ * An entry to start {@code Bireme}.
+ *
+ * @param args arguments from command line.
+ */
+ public void entry(String[] args) {
+ try {
+ parseCommandLine(args);
+ } catch (Exception e) {
+ logger.fatal("Init failed: {}.", e.getMessage());
+ logger.fatal("Stack Trace: ", e);
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
- try {
- closeConnections();
- } catch (SQLException e) {
- logger.warn(e.getMessage());
- }
+ try {
+ start();
+ cxt.waitForStop();
+ } catch (Exception e) {
+ logger.fatal("Bireme stop abnormally since: {}", e.getMessage());
+ logger.fatal("Stack Trace: ", e);
+ }
- logger.info("Bireme exit");
- }
+ cxt.waitForExit();
- public static void main(String[] args) {
- Bireme service = new Bireme();
- service.entry(args);
- }
+ try {
+ closeConnections();
+ } catch (SQLException e) {
+ logger.warn(e.getMessage());
+ }
+
+ logger.info("Bireme exit");
+ }
+
+ public static void main(String[] args) {
+ Bireme service = new Bireme();
+ service.entry(args);
+ }
}
diff --git a/src/main/java/cn/hashdata/bireme/BiremeUtility.java b/src/main/java/cn/hashdata/bireme/BiremeUtility.java
index c2e5d4e..2ec826c 100644
--- a/src/main/java/cn/hashdata/bireme/BiremeUtility.java
+++ b/src/main/java/cn/hashdata/bireme/BiremeUtility.java
@@ -5,61 +5,61 @@
import java.sql.SQLException;
import java.util.Map.Entry;
+import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import cn.hashdata.bireme.Config.ConnectionConfig;
+import org.omg.CORBA.OBJ_ADAPTER;
public class BiremeUtility {
- /**
- * Establish connection to database.
- *
- * @param conf configuration of the aimed database
- * @return the established connection
- * @throws BiremeException Failed to get connection
- */
- public static Connection jdbcConn(ConnectionConfig conf) throws BiremeException {
- Connection conn = null;
+ /**
+ * Establish connection to database.
+ *
+ * @param conf configuration of the aimed database
+ * @return the established connection
+ * @throws BiremeException Failed to get connection
+ */
+ public static Connection jdbcConn(ConnectionConfig conf) throws BiremeException {
+ Connection conn = null;
- try {
- conn = DriverManager.getConnection(conf.jdbcUrl, conf.user, conf.passwd);
- } catch (SQLException e) {
- throw new BiremeException("Fail to get connection.\n", e);
+ try {
+ conn = DriverManager.getConnection(conf.jdbcUrl, conf.user, conf.passwd);
+ } catch (SQLException e) {
+ throw new BiremeException("Fail to get connection.\n", e);
+ }
+
+ return conn;
}
- return conn;
- }
+ /**
+ * Given the key, return the json value as String, ignoring case considerations.
+ *
+ * @param data the JsonObject
+ * @param fieldName the key
+ * @return the value as String
+ * @throws BiremeException when the JsonObject doesn't have the key
+ */
+ public static String jsonGetIgnoreCase(JSONObject data, String fieldName) throws BiremeException {
+ Object element = data.get(fieldName);
- /**
- * Given the key, return the json value as String, ignoring case considerations.
- * @param data the JsonObject
- * @param fieldName the key
- * @return the value as String
- * @throws BiremeException when the JsonObject doesn't have the key
- */
- public static String jsonGetIgnoreCase(JsonObject data, String fieldName) throws BiremeException {
- JsonElement element = data.get(fieldName);
+ if (element == null) {
+ for (Entry iter : data.entrySet()) {
+ String key = iter.getKey();
- if (element == null) {
- for (Entry iter : data.entrySet()) {
- String key = iter.getKey();
+ if (key.equalsIgnoreCase(fieldName)) {
+ element = iter.getValue();
+ break;
+ }
+ }
+ }
- if (key.equalsIgnoreCase(fieldName)) {
- element = iter.getValue();
- break;
+ if (element == null) {
+ throw new BiremeException(
+ "Not found. Record does not have a field named \"" + fieldName + "\".\n");
}
- }
- }
- if (element == null) {
- throw new BiremeException(
- "Not found. Record does not have a field named \"" + fieldName + "\".\n");
- }
+ return element.toString();
- if (element.isJsonNull()) {
- return null;
- } else {
- return element.getAsString();
}
- }
}
diff --git a/src/main/java/cn/hashdata/bireme/ChangeLoader.java b/src/main/java/cn/hashdata/bireme/ChangeLoader.java
index 03b365e..b2b2635 100644
--- a/src/main/java/cn/hashdata/bireme/ChangeLoader.java
+++ b/src/main/java/cn/hashdata/bireme/ChangeLoader.java
@@ -38,432 +38,434 @@
* connections to the database.
*
* @author yuze
- *
*/
public class ChangeLoader implements Callable {
- protected static final Long DELETE_TIMEOUT_NS = 10000000000L;
- protected static final Long NANOSECONDS_TO_SECONDS = 1000000000L;
-
- public Logger logger;
-
- protected boolean optimisticMode = true;
- protected Context cxt;
- protected Config conf;
- protected Connection conn;
- protected LinkedBlockingQueue> taskIn;
- protected Table table;
- protected LoadTask currentTask;
- protected ExecutorService copyThread;
-
- public String mappedTable;
-
- private Timer copyForDeleteTimer;
- private Timer deleteTimer;
- private Timer copyForInsertTimer;
- private Timer.Context timerCTX;
-
- /**
- * Create a new {@code ChangeLoader}.
- *
- * @param cxt the Bireme Context
- * @param pipeLine the {@code PipeLine} belongs to
- * @param mappedTable the target table
- * @param taskIn a queue to get {@code LoadTask}
- */
- public ChangeLoader(Context cxt, PipeLine pipeLine, String mappedTable,
- LinkedBlockingQueue> taskIn) {
- this.cxt = cxt;
- this.conf = cxt.conf;
- this.conn = null;
- this.mappedTable = mappedTable;
- this.table = cxt.tablesInfo.get(mappedTable);
- this.taskIn = taskIn;
- this.copyThread = Executors.newFixedThreadPool(1, new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setDaemon(true);
- return t;
- }
- });
-
- // add statistics
- Timer[] timers = pipeLine.stat.addTimerForLoader(mappedTable);
- copyForDeleteTimer = timers[0];
- deleteTimer = timers[1];
- copyForInsertTimer = timers[2];
-
- logger = pipeLine.logger;
- }
-
- /**
- * Get the task and copy it to target database
- *
- * @throws BiremeException load exception
- * @throws InterruptedException interrupted when load the task
- * @return if normally end, return 0
- */
- @Override
- public Long call() throws BiremeException, InterruptedException {
- while (!cxt.stop) {
- // get task
- if (currentTask == null) {
- currentTask = pollTask();
- }
-
- if (currentTask == null) {
- break;
- }
-
- // get connection
- conn = getConnection();
- if (conn == null) {
- logger.debug("Unable to get Connection.");
- break;
- }
-
- // Execute task and release connection. If failed, close the connection and abandon it.
- try {
- executeTask();
- releaseConnection();
- } catch (BiremeException e) {
- logger.error("Fail to execute task. Message: {}", e.getMessage());
+ protected static final Long DELETE_TIMEOUT_NS = 10000000000L;
+ protected static final Long NANOSECONDS_TO_SECONDS = 1000000000L;
+
+ public Logger logger;
+
+ protected boolean optimisticMode = true;
+ protected Context cxt;
+ protected Config conf;
+ protected Connection conn;
+ protected LinkedBlockingQueue> taskIn;
+ protected Table table;
+ protected LoadTask currentTask;
+ protected ExecutorService copyThread;
+
+ public String mappedTable;
+
+ private Timer copyForDeleteTimer;
+ private Timer deleteTimer;
+ private Timer copyForInsertTimer;
+ private Timer.Context timerCTX;
+
+ /**
+ * Create a new {@code ChangeLoader}.
+ *
+ * @param cxt the Bireme Context
+ * @param pipeLine the {@code PipeLine} belongs to
+ * @param mappedTable the target table
+ * @param taskIn a queue to get {@code LoadTask}
+ */
+ public ChangeLoader(Context cxt, PipeLine pipeLine, String mappedTable,
+ LinkedBlockingQueue> taskIn) {
+ this.cxt = cxt;
+ this.conf = cxt.conf;
+ this.conn = null;
+ this.mappedTable = mappedTable;
+ this.table = cxt.tablesInfo.get(mappedTable);
+ this.taskIn = taskIn;
+ this.copyThread = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ // add statistics
+ Timer[] timers = pipeLine.stat.addTimerForLoader(mappedTable);
+ copyForDeleteTimer = timers[0];
+ deleteTimer = timers[1];
+ copyForInsertTimer = timers[2];
+
+ logger = pipeLine.logger;
+ }
- try {
- conn.rollback();
- conn.close();
- } catch (Exception ignore) {
- logger.error("Fail to roll back after load exception. Message: {}", e.getMessage());
- throw e;
+ /**
+ * Get the task and copy it to target database
+ *
+ * @return if normally end, return 0
+ * @throws BiremeException load exception
+ * @throws InterruptedException interrupted when load the task
+ */
+ @Override
+ public Long call() throws BiremeException, InterruptedException {
+ while (!cxt.stop) {
+ // get task
+ if (currentTask == null) {
+ currentTask = pollTask();
+ }
+
+ if (currentTask == null) {
+ break;
+ }
+
+ // get connection
+ conn = getConnection();
+ if (conn == null) {
+ logger.debug("Unable to get Connection.");
+ break;
+ }
+
+ // Execute task and release connection. If failed, close the connection and abandon it.
+ try {
+ /**
+ * 执行持久化操作
+ */
+ executeTask();
+ releaseConnection();
+ } catch (BiremeException e) {
+ logger.error("Fail to execute task. Message: {}", e.getMessage());
+
+ try {
+ conn.rollback();
+ conn.close();
+ } catch (Exception ignore) {
+ logger.error("Fail to roll back after load exception. Message: {}", e.getMessage());
+ throw e;
+ }
+ throw e;
+
+ } finally {
+ currentTask.destory();
+ currentTask = null;
+ conn = null;
+ }
}
- throw e;
-
- } finally {
- currentTask.destory();
- currentTask = null;
- conn = null;
- }
- }
- return 0L;
- }
-
- /**
- * Check whether {@code Rows} have been merged to a task. If done, poll the task and return.
- *
- * @return a task need be loaded to database
- * @throws BiremeException merge task failed
- * @throws InterruptedException if the current thread was interrupted while waiting
- */
- protected LoadTask pollTask() throws BiremeException, InterruptedException {
- LoadTask task = null;
- Future head = taskIn.peek();
-
- if (head != null && head.isDone()) {
- taskIn.remove();
-
- try {
- task = head.get();
- } catch (ExecutionException e) {
- throw new BiremeException("Merge task failed.\n", e.getCause());
- }
+ return 0L;
}
- return task;
- }
-
- /**
- * Get connection to the destination database from connection pool.
- *
- * @return the connection
- * @throws BiremeException when unable to create temporary table
- */
- protected Connection getConnection() throws BiremeException {
- Connection connection = cxt.loaderConnections.poll();
- if (connection == null) {
- String message = "Unable to get Connection.";
- logger.fatal(message);
- throw new BiremeException(message);
+ /**
+ * Check whether {@code Rows} have been merged to a task. If done, poll the task and return.
+ *
+ * @return a task need be loaded to database
+ * @throws BiremeException merge task failed
+ * @throws InterruptedException if the current thread was interrupted while waiting
+ */
+ protected LoadTask pollTask() throws BiremeException, InterruptedException {
+ LoadTask task = null;
+ Future head = taskIn.peek();
+
+ if (head != null && head.isDone()) {
+ taskIn.remove();
+
+ try {
+ task = head.get();
+ } catch (ExecutionException e) {
+ throw new BiremeException("Merge task failed.\n", e.getCause());
+ }
+ }
+
+ return task;
}
- HashSet temporaryTables = cxt.temporaryTables.get(connection);
+ /**
+ * Get connection to the destination database from connection pool.
+ *
+ * @return the connection
+ * @throws BiremeException when unable to create temporary table
+ */
+ protected Connection getConnection() throws BiremeException {
+ Connection connection = cxt.loaderConnections.poll();
+ if (connection == null) {
+ String message = "Unable to get Connection.";
+ logger.fatal(message);
+ throw new BiremeException(message);
+ }
- if (!temporaryTables.contains(mappedTable)) {
- createTemporaryTable(connection);
- temporaryTables.add(mappedTable);
- }
- return connection;
- }
-
- /**
- * Return the connection to connection pool.
- *
- */
- protected void releaseConnection() {
- cxt.loaderConnections.offer(conn);
- conn = null;
- }
-
- /**
- * Load the task to destination database. First load the delete set and then load the insert set.
- *
- * @throws BiremeException Wrap the exception when load the task
- * @throws InterruptedException if interrupted while waiting
- */
- protected void executeTask() throws BiremeException, InterruptedException {
- if (!currentTask.delete.isEmpty() || (!optimisticMode && !currentTask.insert.isEmpty())) {
- int size = currentTask.delete.size();
-
- if (!optimisticMode) {
- currentTask.delete.addAll(currentTask.insert.keySet());
- }
-
- if (executeDelete(currentTask.delete) <= size && optimisticMode == false) {
- optimisticMode = true;
-
- logger.info("Chang to optimistic mode.");
- }
- }
+ HashSet temporaryTables = cxt.temporaryTables.get(connection);
- if (!currentTask.insert.isEmpty()) {
- HashSet insertSet = new HashSet();
- insertSet.addAll(currentTask.insert.values());
- executeInsert(insertSet);
+ if (!temporaryTables.contains(mappedTable)) {
+ createTemporaryTable(connection);
+ temporaryTables.add(mappedTable);
+ }
+ return connection;
}
- try {
- conn.commit();
- } catch (SQLException e) {
- String message = "commit failed.";
- throw new BiremeException(message, e);
+ /**
+ * Return the connection to connection pool.
+ */
+ protected void releaseConnection() {
+ cxt.loaderConnections.offer(conn);
+ conn = null;
}
- for (CommitCallback callback : currentTask.callbacks) {
- callback.done();
+ /**
+ * Load the task to destination database. First load the delete set and then load the insert set.
+ *
+ * @throws BiremeException Wrap the exception when load the task
+ * @throws InterruptedException if interrupted while waiting
+ */
+ protected void executeTask() throws BiremeException, InterruptedException {
+ if (!currentTask.delete.isEmpty() || (!optimisticMode && !currentTask.insert.isEmpty())) {
+ int size = currentTask.delete.size();
+
+ if (!optimisticMode) {
+ currentTask.delete.addAll(currentTask.insert.keySet());
+ }
+
+ if (executeDelete(currentTask.delete) <= size && optimisticMode == false) {
+ optimisticMode = true;
+
+ logger.info("Chang to optimistic mode.");
+ }
+ }
+
+ if (!currentTask.insert.isEmpty()) {
+ HashSet insertSet = new HashSet();
+ insertSet.addAll(currentTask.insert.values());
+ executeInsert(insertSet);
+ }
+
+ try {
+ conn.commit();
+ } catch (SQLException e) {
+ String message = "commit failed.";
+ throw new BiremeException(message, e);
+ }
+
+ for (CommitCallback callback : currentTask.callbacks) {
+ callback.done();
+ }
}
- }
- private Long executeDelete(Set delete) throws BiremeException, InterruptedException {
- long deleteCounts;
- ArrayList keyNames = table.keyNames;
- String temporaryTableName = getTemporaryTableName();
+ private Long executeDelete(Set delete) throws BiremeException, InterruptedException {
+ long deleteCounts;
+ ArrayList keyNames = table.keyNames;
+ String temporaryTableName = getTemporaryTableName();
- timerCTX = copyForDeleteTimer.time();
- copyWorker(temporaryTableName, keyNames, delete);
- timerCTX.stop();
+ timerCTX = copyForDeleteTimer.time();
+ copyWorker(temporaryTableName, keyNames, delete);
+ timerCTX.stop();
- timerCTX = deleteTimer.time();
- deleteCounts = deleteWorker(mappedTable, temporaryTableName, keyNames);
+ timerCTX = deleteTimer.time();
+ deleteCounts = deleteWorker(mappedTable, temporaryTableName, keyNames);
- long deleteTime = timerCTX.stop();
- if (deleteTime > DELETE_TIMEOUT_NS) {
- String plan = deletePlan(mappedTable, temporaryTableName, keyNames);
+ long deleteTime = timerCTX.stop();
+ if (deleteTime > DELETE_TIMEOUT_NS) {
+ String plan = deletePlan(mappedTable, temporaryTableName, keyNames);
- logger.warn("Delete operation takes {} seconds, delete plan:\n {}",
- deleteTime / NANOSECONDS_TO_SECONDS, plan);
- }
+ logger.warn("Delete operation takes {} seconds, delete plan:\n {}",
+ deleteTime / NANOSECONDS_TO_SECONDS, plan);
+ }
- return deleteCounts;
- }
+ return deleteCounts;
+ }
- private void executeInsert(Set insertSet) throws BiremeException, InterruptedException {
- ArrayList columnList = table.columnName;
+ private void executeInsert(Set insertSet) throws BiremeException, InterruptedException {
+ ArrayList columnList = table.columnName;
- timerCTX = copyForInsertTimer.time();
- try {
- copyWorker(mappedTable, columnList, insertSet);
- } catch (BiremeException e) {
- if (e.getCause().getMessage().contains("duplicate key value") && optimisticMode) {
+ timerCTX = copyForInsertTimer.time();
try {
- conn.rollback();
- } catch (SQLException ignore) {
+ copyWorker(mappedTable, columnList, insertSet);
+ } catch (BiremeException e) {
+ if (e.getCause().getMessage().contains("duplicate key value") && optimisticMode) {
+ try {
+ conn.rollback();
+ } catch (SQLException ignore) {
+ }
+
+ optimisticMode = false;
+
+ logger.info("Chang to passimistic mode.");
+
+ executeDelete(currentTask.insert.keySet());
+ executeInsert(insertSet);
+ } else {
+ throw e;
+ }
}
- optimisticMode = false;
+ timerCTX.stop();
+ }
- logger.info("Chang to passimistic mode.");
+ private Long copyWorker(String tableName, ArrayList columnList, Set tuples)
+ throws BiremeException, InterruptedException {
+ Future copyResult;
+ long copyCount = -1L;
+ PipedOutputStream pipeOut = new PipedOutputStream();
+ PipedInputStream pipeIn = null;
+ BiremeException temp = null;
- executeDelete(currentTask.insert.keySet());
- executeInsert(insertSet);
- } else {
- throw e;
- }
- }
+ try {
+ pipeIn = new PipedInputStream(pipeOut);
+ } catch (IOException e) {
+ throw new BiremeException("I/O error occurs while create PipedInputStream.", e);
+ }
- timerCTX.stop();
- }
-
- private Long copyWorker(String tableName, ArrayList columnList, Set tuples)
- throws BiremeException, InterruptedException {
- Future copyResult;
- long copyCount = -1L;
- PipedOutputStream pipeOut = new PipedOutputStream();
- PipedInputStream pipeIn = null;
- BiremeException temp = null;
-
- try {
- pipeIn = new PipedInputStream(pipeOut);
- } catch (IOException e) {
- throw new BiremeException("I/O error occurs while create PipedInputStream.", e);
- }
+ String sql = getCopySql(tableName, columnList);
+ copyResult = copyThread.submit(new TupleCopyer(pipeIn, sql, conn));
- String sql = getCopySql(tableName, columnList);
- copyResult = copyThread.submit(new TupleCopyer(pipeIn, sql, conn));
+ try {
+ tupleWriter(pipeOut, tuples);
+ } catch (BiremeException e) {
+ temp = e;
+ }
- try {
- tupleWriter(pipeOut, tuples);
- } catch (BiremeException e) {
- temp = e;
- }
+ try {
+ while (!copyResult.isDone() && !cxt.stop) {
+ Thread.sleep(1);
+ }
- try {
- while (!copyResult.isDone() && !cxt.stop) {
- Thread.sleep(1);
- }
+ copyCount = copyResult.get();
+ } catch (ExecutionException e) {
+ throw new BiremeException("Copy failed.", e.getCause());
+ }
- copyCount = copyResult.get();
- } catch (ExecutionException e) {
- throw new BiremeException("Copy failed.", e.getCause());
- }
+ if (temp != null) {
+ throw temp;
+ }
- if (temp != null) {
- throw temp;
+ return copyCount;
}
- return copyCount;
- }
-
- private String getCopySql(String tableName, List columnList) {
- StringBuilder sb =
- new StringBuilder()
- .append("COPY ")
- .append(tableName)
- .append(" (")
- .append(StringUtils.join(columnList, ","))
- .append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\';");
- String sql = sb.toString();
- return sql;
- }
-
- private Long deleteWorker(String table, String tmpTable, ArrayList columnList)
- throws BiremeException {
- StringBuilder sb = new StringBuilder();
- Long count = 0L;
-
- for (int i = 0; i < columnList.size(); i++) {
- if (i != 0) {
- sb.append(" and ");
- }
-
- sb.append(table + "." + columnList.get(i) + "=" + tmpTable + "." + columnList.get(i));
+ private String getCopySql(String tableName, List columnList) {
+ StringBuilder sb =
+ new StringBuilder()
+ .append("COPY ")
+ .append(tableName)
+ .append(" (")
+ .append(StringUtils.join(columnList, ","))
+ .append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\';");
+ String sql = sb.toString();
+ return sql;
}
- String sql = "DELETE FROM " + table + " WHERE EXISTS (SELECT 1 FROM " + tmpTable + " WHERE "
- + sb.toString() + ");";
+ private Long deleteWorker(String table, String tmpTable, ArrayList columnList)
+ throws BiremeException {
+ StringBuilder sb = new StringBuilder();
+ Long count = 0L;
- try {
- count = (long) conn.createStatement().executeUpdate(sql);
- } catch (SQLException e) {
- throw new BiremeException("Delete failed.", e);
- }
+ for (int i = 0; i < columnList.size(); i++) {
+ if (i != 0) {
+ sb.append(" and ");
+ }
- return count;
- }
+ sb.append(table + "." + columnList.get(i) + "=" + tmpTable + "." + columnList.get(i));
+ }
- private String deletePlan(String table, String tmpTable, ArrayList columnList)
- throws BiremeException {
- StringBuilder sb = new StringBuilder();
+ String sql = "DELETE FROM " + table + " WHERE EXISTS (SELECT 1 FROM " + tmpTable + " WHERE "
+ + sb.toString() + ");";
- for (int i = 0; i < columnList.size(); i++) {
- if (i != 0) {
- sb.append(" and ");
- }
+ try {
+ count = (long) conn.createStatement().executeUpdate(sql);
+ } catch (SQLException e) {
+ throw new BiremeException("Delete failed.", e);
+ }
- sb.append(table + "." + columnList.get(i) + "=" + tmpTable + "." + columnList.get(i));
+ return count;
}
- String sql = "EXPLAIN DELETE FROM " + table + " WHERE EXISTS (SELECT 1 FROM " + tmpTable
- + " WHERE " + sb.toString() + ");";
-
- try {
- ResultSet rs = conn.createStatement().executeQuery(sql);
+ private String deletePlan(String table, String tmpTable, ArrayList columnList)
+ throws BiremeException {
+ StringBuilder sb = new StringBuilder();
- if (!rs.wasNull()) {
- sb.setLength(0);
+ for (int i = 0; i < columnList.size(); i++) {
+ if (i != 0) {
+ sb.append(" and ");
+ }
- while (rs.next()) {
- sb.append(rs.getString(1) + "\n");
+ sb.append(table + "." + columnList.get(i) + "=" + tmpTable + "." + columnList.get(i));
}
- return sb.toString();
+ String sql = "EXPLAIN DELETE FROM " + table + " WHERE EXISTS (SELECT 1 FROM " + tmpTable
+ + " WHERE " + sb.toString() + ");";
+
+ try {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+
+ if (!rs.wasNull()) {
+ sb.setLength(0);
+
+ while (rs.next()) {
+ sb.append(rs.getString(1) + "\n");
+ }
+
+ return sb.toString();
- } else {
- return "Can not get plan.";
- }
+ } else {
+ return "Can not get plan.";
+ }
- } catch (SQLException e) {
- throw new BiremeException("Fail to get delete plan.", e);
+ } catch (SQLException e) {
+ throw new BiremeException("Fail to get delete plan.", e);
+ }
}
- }
- private class TupleCopyer implements Callable {
- PipedInputStream pipeIn;
- String sql;
- Connection conn;
+ private class TupleCopyer implements Callable {
+ PipedInputStream pipeIn;
+ String sql;
+ Connection conn;
+
+ public TupleCopyer(PipedInputStream pipeIn, String sql, Connection conn) {
+ this.pipeIn = pipeIn;
+ this.sql = sql;
+ this.conn = conn;
+ }
- public TupleCopyer(PipedInputStream pipeIn, String sql, Connection conn) {
- this.pipeIn = pipeIn;
- this.sql = sql;
- this.conn = conn;
+ @Override
+ public Long call() throws SQLException, IOException {
+ try {
+ CopyManager mgr = new CopyManager((BaseConnection) conn);
+ return mgr.copyIn(sql, pipeIn);
+ } finally {
+ try {
+ pipeIn.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
}
- @Override
- public Long call() throws SQLException, IOException {
- try {
- CopyManager mgr = new CopyManager((BaseConnection) conn);
- return mgr.copyIn(sql, pipeIn);
- } finally {
+ private void tupleWriter(PipedOutputStream pipeOut, Set tuples) throws BiremeException {
+ byte[] data = null;
+
try {
- pipeIn.close();
- } catch (IOException ignore) {
+ Iterator iterator = tuples.iterator();
+
+ while (iterator.hasNext() && !cxt.stop) {
+ data = iterator.next().getBytes("UTF-8");
+ pipeOut.write(data);
+ }
+
+ pipeOut.flush();
+ } catch (IOException e) {
+ throw new BiremeException("I/O error occurs while write to pipe.", e);
+ } finally {
+ try {
+ pipeOut.close();
+ } catch (IOException ignore) {
+ }
}
- }
- }
- }
-
- private void tupleWriter(PipedOutputStream pipeOut, Set tuples) throws BiremeException {
- byte[] data = null;
-
- try {
- Iterator iterator = tuples.iterator();
-
- while (iterator.hasNext() && !cxt.stop) {
- data = iterator.next().getBytes("UTF-8");
- pipeOut.write(data);
- }
-
- pipeOut.flush();
- } catch (IOException e) {
- throw new BiremeException("I/O error occurs while write to pipe.", e);
- } finally {
- try {
- pipeOut.close();
- } catch (IOException ignore) {
- }
}
- }
- private String getTemporaryTableName() {
- return mappedTable.replace('.', '_');
- }
+ private String getTemporaryTableName() {
+ return mappedTable.replace('.', '_');
+ }
- private void createTemporaryTable(Connection conn) throws BiremeException {
- String sql = "CREATE TEMP TABLE " + getTemporaryTableName()
- + " ON COMMIT DELETE ROWS AS SELECT * FROM " + mappedTable + " LIMIT 0;";
+ private void createTemporaryTable(Connection conn) throws BiremeException {
+ String sql = "CREATE TEMP TABLE " + getTemporaryTableName()
+ + " ON COMMIT DELETE ROWS AS SELECT * FROM " + mappedTable + " LIMIT 0;";
- try {
- conn.createStatement().executeUpdate(sql);
- conn.commit();
- } catch (SQLException e) {
- throw new BiremeException("Fail to create tmporary table.", e);
+ try {
+ conn.createStatement().executeUpdate(sql);
+ conn.commit();
+ } catch (SQLException e) {
+ throw new BiremeException("Fail to create tmporary table.", e);
+ }
}
- }
}
diff --git a/src/main/java/cn/hashdata/bireme/Column.java b/src/main/java/cn/hashdata/bireme/Column.java
new file mode 100644
index 0000000..eda55fc
--- /dev/null
+++ b/src/main/java/cn/hashdata/bireme/Column.java
@@ -0,0 +1,15 @@
+package cn.hashdata.bireme;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class Column {
+ private String type;
+ private String name;
+ private boolean signed;
+ private String charset;
+}
\ No newline at end of file
diff --git a/src/main/java/cn/hashdata/bireme/Config.java b/src/main/java/cn/hashdata/bireme/Config.java
index 56b8cd4..fc8d56a 100644
--- a/src/main/java/cn/hashdata/bireme/Config.java
+++ b/src/main/java/cn/hashdata/bireme/Config.java
@@ -58,6 +58,8 @@ public class Config {
public HashMap sourceConfig;
public HashMap tableMap;
+ public boolean isDDL = false;
+
public static class ConnectionConfig {
public String jdbcUrl;
public String user;
@@ -177,9 +179,20 @@ protected void fetchSourceAndTableMap() throws BiremeException, ConfigurationExc
throw new BiremeException(message);
}
- conf.tableMap = fetchTableMap(conf.name);
+ /**
+ * 如果是ddl操作,那么就不用获取配置文件(也没有哈)
+ */
+ if (!conf.topic.contains("ddl")) {
+ conf.tableMap = fetchTableMap(conf.name);
+ }
+ else{
+ this.isDDL = true;
+ }
}
+ /**
+ * 默认的数据库连接(10)
+ */
if (loader_conn_size > loadersCount) {
loader_conn_size = loadersCount;
}
@@ -262,6 +275,9 @@ private HashMap fetchTableMap(String dataSource)
localTableMap.put(dataSource + "." + originTable, mappedTable);
+ /**
+ * jdbc 数据库连接,一个表一个
+ */
if (!tableMap.values().contains(mappedTable)) {
loadersCount++;
}
diff --git a/src/main/java/cn/hashdata/bireme/Context.java b/src/main/java/cn/hashdata/bireme/Context.java
index da212ba..0ed03d2 100644
--- a/src/main/java/cn/hashdata/bireme/Context.java
+++ b/src/main/java/cn/hashdata/bireme/Context.java
@@ -29,174 +29,177 @@
* bireme context.
*
* @author yuze
- *
*/
public class Context {
- static final protected Long TIMEOUT_MS = 1000L;
+ static final protected Long TIMEOUT_MS = 1000L;
- public volatile boolean stop = false;
+ public volatile boolean stop = false;
- public Config conf;
- public MetricRegistry register = new MetricRegistry();
+ public Config conf;
+ public MetricRegistry register = new MetricRegistry();
- public HashMap tableMap;
- public HashMap tablesInfo;
+ public HashMap tableMap;
+ public HashMap tablesInfo;
- public LinkedBlockingQueue loaderConnections;
- public HashMap> temporaryTables;
+ public LinkedBlockingQueue loaderConnections;
+ public HashMap> temporaryTables;
- public ArrayList pipeLines;
+ public ArrayList pipeLines;
- public ExecutorService schedule;
- public Future scheduleResult;
- public ExecutorService pipeLinePool;
- public ExecutorService transformerPool;
- public ExecutorService mergerPool;
- public ExecutorService loaderPool;
+ public ExecutorService schedule;
+ public Future scheduleResult;
+ public ExecutorService pipeLinePool;
+ public ExecutorService transformerPool;
+ public ExecutorService mergerPool;
+ public ExecutorService loaderPool;
- public WatchDog watchDog;
+ public WatchDog watchDog;
- public StateServer server;
+ public StateServer server;
- public Logger logger = LogManager.getLogger("Bireme.Context");
+ public Logger logger = LogManager.getLogger("Bireme.Context");
- /**
- * Create a new bireme context for test.
- *
- * @param conf bireme configuration
- * @throws BiremeException Unknown Host
- */
- public Context(Config conf) throws BiremeException {
- this.conf = conf;
+ /**
+ * Create a new bireme context for test.
+ *
+ * @param conf bireme configuration
+ * @throws BiremeException Unknown Host
+ */
+ public Context(Config conf) throws BiremeException {
+ this.conf = conf;
- this.tableMap = conf.tableMap;
- this.tablesInfo = new HashMap();
+ this.tableMap = conf.tableMap;
+ this.tablesInfo = new HashMap();
- this.loaderConnections = new LinkedBlockingQueue(conf.loader_conn_size);
- this.temporaryTables = new HashMap>();
+ if (!conf.isDDL) {
+ this.loaderConnections = new LinkedBlockingQueue(conf.loader_conn_size);
+ this.temporaryTables = new HashMap>();
+ }
- this.pipeLines = new ArrayList();
- this.schedule = Executors.newSingleThreadExecutor(new BiremeThreadFactory("Scheduler"));
+ this.pipeLines = new ArrayList();
+ this.schedule = Executors.newSingleThreadExecutor(new BiremeThreadFactory("Scheduler"));
- this.server = new StateServer(this, conf.state_server_addr, conf.state_server_port);
+ this.server = new StateServer(this, conf.state_server_addr, conf.state_server_port);
- createThreadPool();
- }
+ createThreadPool();
+ }
- private void createThreadPool() {
- pipeLinePool =
- Executors.newFixedThreadPool(conf.pipeline_pool_size, new BiremeThreadFactory("PipeLine"));
- transformerPool = Executors.newFixedThreadPool(
- conf.transform_pool_size, new BiremeThreadFactory("Transformer"));
- mergerPool =
- Executors.newFixedThreadPool(conf.merge_pool_size, new BiremeThreadFactory("Merger"));
- loaderPool =
- Executors.newFixedThreadPool(conf.loader_conn_size, new BiremeThreadFactory("Loader"));
- }
+ private void createThreadPool() {
+ pipeLinePool =
+ Executors.newFixedThreadPool(conf.pipeline_pool_size, new BiremeThreadFactory("PipeLine"));
+ transformerPool = Executors.newFixedThreadPool(
+ conf.transform_pool_size, new BiremeThreadFactory("Transformer"));
+ mergerPool =
+ Executors.newFixedThreadPool(conf.merge_pool_size, new BiremeThreadFactory("Merger"));
+ if (!conf.isDDL) {
+ loaderPool =
+ Executors.newFixedThreadPool(conf.loader_conn_size, new BiremeThreadFactory("Loader"));
+ }
- class WatchDog extends Thread {
- private DaemonController controller;
- private Context cxt;
+ }
- public WatchDog(DaemonController controller, Context cxt) {
- this.controller = controller;
- this.cxt = cxt;
- this.setDaemon(true);
- this.setName("WatchDog");
+ class WatchDog extends Thread {
+ private DaemonController controller;
+ private Context cxt;
+
+ public WatchDog(DaemonController controller, Context cxt) {
+ this.controller = controller;
+ this.cxt = cxt;
+ this.setDaemon(true);
+ this.setName("WatchDog");
+ }
+
+ @Override
+ public void run() {
+ try {
+ cxt.waitForStop();
+ } catch (InterruptedException e) {
+ controller.fail("Service stopped by user");
+
+ } catch (BiremeException e) {
+ logger.fatal("Bireme stop abnormally since: {}", e.getMessage());
+ logger.fatal("Stack Trace: ", e);
+ controller.fail(e);
+ }
+ }
}
- @Override
- public void run() {
- try {
- cxt.waitForStop();
- } catch (InterruptedException e) {
- controller.fail("Service stopped by user");
-
- } catch (BiremeException e) {
- logger.fatal("Bireme stop abnormally since: {}", e.getMessage());
- logger.fatal("Stack Trace: ", e);
- controller.fail(e);
- }
+ public void startWatchDog(DaemonController controller) {
+ watchDog = new WatchDog(controller, this);
+ watchDog.start();
}
- }
-
- public void startWatchDog(DaemonController controller) {
- watchDog = new WatchDog(controller, this);
- watchDog.start();
- }
-
- /**
- * Start the {@code Scheduler} to schedule the {@code PipeLines} to work.
- */
- public void startScheduler() {
- scheduleResult = schedule.submit(new Scheduler(this));
- server.start();
- }
-
- /**
- * Wait for bireme to stop and shout down all thread pool.
- *
- * @throws InterruptedException if interrupted while waiting
- * @throws BiremeException scheduler throw out Exception
- */
- public void waitForStop() throws BiremeException, InterruptedException {
- try {
- while (!scheduleResult.isDone()) {
- Thread.sleep(1);
- }
- scheduleResult.get();
- } catch (ExecutionException e) {
- throw new BiremeException("Scheduler abnormally stopped.", e.getCause());
-
- } finally {
- server.stop();
- schedule.shutdownNow();
- pipeLinePool.shutdownNow();
- transformerPool.shutdownNow();
- mergerPool.shutdownNow();
- loaderPool.shutdownNow();
+
+ /**
+ * Start the {@code Scheduler} to schedule the {@code PipeLines} to work.
+ */
+ public void startScheduler() {
+ scheduleResult = schedule.submit(new Scheduler(this));
+ server.start();
}
- }
-
- /**
- * Wait for all thread pool to terminate.
- */
- public void waitForExit() {
- try {
- schedule.awaitTermination(1, TimeUnit.MINUTES);
- pipeLinePool.awaitTermination(1, TimeUnit.MINUTES);
- transformerPool.awaitTermination(1, TimeUnit.MINUTES);
- mergerPool.awaitTermination(1, TimeUnit.MINUTES);
- loaderPool.awaitTermination(1, TimeUnit.MINUTES);
- } catch (InterruptedException ignore) {
+
+ /**
+ * Wait for bireme to stop and shout down all thread pool.
+ *
+ * @throws InterruptedException if interrupted while waiting
+ * @throws BiremeException scheduler throw out Exception
+ */
+ public void waitForStop() throws BiremeException, InterruptedException {
+ try {
+ while (!scheduleResult.isDone()) {
+ Thread.sleep(1);
+ }
+ scheduleResult.get();
+ } catch (ExecutionException e) {
+ throw new BiremeException("Scheduler abnormally stopped.", e.getCause());
+
+ } finally {
+ server.stop();
+ schedule.shutdownNow();
+ pipeLinePool.shutdownNow();
+ transformerPool.shutdownNow();
+ mergerPool.shutdownNow();
+ loaderPool.shutdownNow();
+ }
+ }
+
+ /**
+ * Wait for all thread pool to terminate.
+ */
+ public void waitForExit() {
+ try {
+ schedule.awaitTermination(1, TimeUnit.MINUTES);
+ pipeLinePool.awaitTermination(1, TimeUnit.MINUTES);
+ transformerPool.awaitTermination(1, TimeUnit.MINUTES);
+ mergerPool.awaitTermination(1, TimeUnit.MINUTES);
+ loaderPool.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ignore) {
+ }
}
- }
}
/**
* For create new Thread.
*
* @author yuze
- *
*/
class BiremeThreadFactory implements ThreadFactory {
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- public BiremeThreadFactory(String poolName) {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- namePrefix = "pool-" + poolName + "-thread-";
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public BiremeThreadFactory(String poolName) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ namePrefix = "pool-" + poolName + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
}
diff --git a/src/main/java/cn/hashdata/bireme/Dispatcher.java b/src/main/java/cn/hashdata/bireme/Dispatcher.java
index 4b2b13f..72de08a 100644
--- a/src/main/java/cn/hashdata/bireme/Dispatcher.java
+++ b/src/main/java/cn/hashdata/bireme/Dispatcher.java
@@ -1,6 +1,8 @@
package cn.hashdata.bireme;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -9,100 +11,108 @@
import java.util.concurrent.LinkedBlockingQueue;
import cn.hashdata.bireme.pipeline.PipeLine;
+import org.apache.logging.log4j.Logger;
/**
* A {@code Dispatcher} is binded with a {@code PipeLine}. It get the transform result and insert
* into cache.
*
* @author yuze
- *
*/
public class Dispatcher {
- public Context cxt;
- public PipeLine pipeLine;
- public RowSet rowSet;
- public boolean complete;
- public LinkedBlockingQueue> transResult;
- public ConcurrentHashMap cache;
-
- public Dispatcher(Context cxt, PipeLine pipeLine) {
- this.cxt = cxt;
- this.pipeLine = pipeLine;
- this.rowSet = null;
- this.complete = false;
- this.transResult = pipeLine.transResult;
- this.cache = pipeLine.cache;
- }
-
- /**
- * Get the transform result and dispatch.
- *
- * @throws BiremeException transform failed
- * @throws InterruptedException if the current thread was interrupted while waiting
- */
- public void dispatch() throws BiremeException, InterruptedException {
- if (rowSet != null) {
- complete = insertRowSet();
-
- if (!complete) {
- return;
- }
+ public Context cxt;
+ public PipeLine pipeLine;
+ public RowSet rowSet;
+ public boolean complete;
+ public LinkedBlockingQueue> transResult;
+ public ConcurrentHashMap cache;
+
+ public Logger logger;
+
+ public Dispatcher(Context cxt, PipeLine pipeLine) {
+ this.cxt = cxt;
+ this.pipeLine = pipeLine;
+ this.rowSet = null;
+ this.complete = false;
+ this.transResult = pipeLine.transResult;
+ this.cache = pipeLine.cache;
}
- while (!transResult.isEmpty() && !cxt.stop) {
- Future head = transResult.peek();
-
- if (head.isDone()) {
- transResult.remove();
- try {
- rowSet = head.get();
- } catch (ExecutionException e) {
- throw new BiremeException("Transform failed.\n", e.getCause());
+ /**
+ * Get the transform result and dispatch.
+ *
+ * @throws BiremeException transform failed
+ * @throws InterruptedException if the current thread was interrupted while waiting
+ */
+ public void dispatch() throws BiremeException, InterruptedException {
+ if (rowSet != null) {
+ complete = insertRowSet();
+ if (!complete) {
+ return;
+ }
}
- complete = insertRowSet();
-
- if (!complete) {
- break;
+ while (!transResult.isEmpty() && !cxt.stop) {
+ Future head = transResult.peek();
+ if (head.isDone()) {
+ transResult.remove();
+ try {
+ rowSet = head.get();
+ } catch (ExecutionException e) {
+ throw new BiremeException("Transform failed.\n", e.getCause());
+ }
+
+ if (rowSet != null) {
+ rowSet.rowBucket.forEach((k, v) -> {
+ System.out.println("row-key:" + k + ", row-value:" + Arrays.toString(v.toArray()));
+ });
+ complete = insertRowSet();
+ }
+
+ if (!complete) {
+ break;
+ }
+ } else {
+ break;
+ }
}
- } else {
- break;
- }
}
- }
-
- private boolean insertRowSet() {
- HashMap> bucket = rowSet.rowBucket;
- boolean complete = true;
-
- ArrayList>> entrySet =
- new ArrayList>>();
- entrySet.addAll(bucket.entrySet());
-
- for (Entry> entry : entrySet) {
- String fullTableName = entry.getKey();
- ArrayList rows = entry.getValue();
- RowCache rowCache = cache.get(fullTableName);
- if (rowCache == null) {
- rowCache = new RowCache(cxt, fullTableName, pipeLine);
- cache.put(fullTableName, rowCache);
- }
-
- complete = rowCache.addRows(rows, rowSet.callback);
- if (!complete) {
- break;
- }
+ private boolean insertRowSet() {
+ HashMap> bucket = rowSet.rowBucket;
+ boolean complete = true;
+
+ ArrayList>> entrySet =
+ new ArrayList>>();
+ entrySet.addAll(bucket.entrySet());
+
+ for (Entry> entry : entrySet) {
+ String fullTableName = entry.getKey();
+ if (null == fullTableName || "".equals(fullTableName)) {
+ continue;
+ }
+ ArrayList rows = entry.getValue();
+ RowCache rowCache = cache.get(fullTableName);
+
+ if (rowCache == null) {
+ rowCache = new RowCache(cxt, fullTableName, pipeLine);
+ cache.put(fullTableName, rowCache);
+ }
+
+ complete = rowCache.addRows(rows, rowSet.callback);
+ if (!complete) {
+ break;
+ }
+
+ bucket.remove(fullTableName);
+ rows.clear();
+ }
- bucket.remove(fullTableName);
- rows.clear();
- }
+ if (complete) {
+ rowSet.destory();
+ rowSet = null;
+ }
- if (complete) {
- rowSet.destory();
- rowSet = null;
+ return complete;
}
-
- return complete;
- }
}
diff --git a/src/main/java/cn/hashdata/bireme/Row.java b/src/main/java/cn/hashdata/bireme/Row.java
index 5351fad..566159f 100644
--- a/src/main/java/cn/hashdata/bireme/Row.java
+++ b/src/main/java/cn/hashdata/bireme/Row.java
@@ -4,6 +4,7 @@
package cn.hashdata.bireme;
+
/**
* {@code Row} is a bireme inner format to represent operations to a table. It is transformed from
* the data polled from any data source. {@code Row} is supposed to contain information about
@@ -14,12 +15,38 @@
*/
public class Row {
public enum RowType { INSERT, UPDATE, DELETE }
+ public enum DDLType {CREATE_DATABASE, DROP_DATABASE, ALTER_DATABASE, CREATE_TABLE, DROP_TABLE, ALTER_TABLE}
public Long produceTime;
public RowType type;
+ public DDLType ddlType;
public String originTable;
public String mappedTable;
public String keys;
public String oldKeys;
public String tuple;
+ public String ddlSQL;
+ public String database;
+ public String table;
+ public TableSchema def;
+ public boolean isDDL = false;
+
+ @Override
+ public String toString() {
+ return "Row{" +
+ "produceTime=" + produceTime +
+ ", type=" + type +
+ ", ddlType=" + ddlType +
+ ", originTable='" + originTable + '\'' +
+ ", mappedTable='" + mappedTable + '\'' +
+ ", keys='" + keys + '\'' +
+ ", oldKeys='" + oldKeys + '\'' +
+ ", tuple='" + tuple + '\'' +
+ ", ddlSQL='" + ddlSQL + '\'' +
+ ", database='" + database + '\'' +
+ ", table='" + table + '\'' +
+ ", def=" + def +
+ ", isDDL=" + isDDL +
+ '}';
+ }
}
diff --git a/src/main/java/cn/hashdata/bireme/TableSchema.java b/src/main/java/cn/hashdata/bireme/TableSchema.java
new file mode 100644
index 0000000..60f6d45
--- /dev/null
+++ b/src/main/java/cn/hashdata/bireme/TableSchema.java
@@ -0,0 +1,34 @@
+package cn.hashdata.bireme;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+/***
+ *
+ * * "def":{"database":"test","charset":"utf8mb4","table":"ddd",
+ * * "columns":[{"type":"int","name":"id","signed":true},
+ * * {"type":"varchar","name":"daemon","charset":"utf8mb4"}],"primary-key":["id"]},
+ *
+ */
+@Slf4j
+@Getter
+@Setter
+@NoArgsConstructor
+public class TableSchema {
+
+
+ private String database;
+
+ private String table;
+
+ private String charset;
+
+ private List columns;
+
+ private List primary_key;
+
+}
\ No newline at end of file
diff --git a/src/main/java/cn/hashdata/bireme/pipeline/DebeziumPipeLine.java b/src/main/java/cn/hashdata/bireme/pipeline/DebeziumPipeLine.java
index 2a1c02b..5328924 100644
--- a/src/main/java/cn/hashdata/bireme/pipeline/DebeziumPipeLine.java
+++ b/src/main/java/cn/hashdata/bireme/pipeline/DebeziumPipeLine.java
@@ -12,6 +12,7 @@
import java.util.TimeZone;
import java.util.stream.Collectors;
+import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -208,14 +209,14 @@ class DebeziumRecord implements Record {
public String topic;
public Long produceTime;
public RowType type;
- public JsonObject data;
+ public JSONObject data;
public DebeziumRecord(String topic, JsonObject payLoad) {
this.topic = topic;
char op = payLoad.get("op").getAsCharacter();
this.produceTime = payLoad.get("ts_ms").getAsLong();
- JsonElement element = null;
+ Object element = null;
switch (op) {
case 'r':
case 'c':
@@ -234,7 +235,7 @@ public DebeziumRecord(String topic, JsonObject payLoad) {
break;
}
- this.data = element.getAsJsonObject();
+ this.data = (JSONObject) element;
}
@Override
diff --git a/src/main/java/cn/hashdata/bireme/pipeline/KafkaPipeLine.java b/src/main/java/cn/hashdata/bireme/pipeline/KafkaPipeLine.java
index fbc0bf4..26ba8e8 100644
--- a/src/main/java/cn/hashdata/bireme/pipeline/KafkaPipeLine.java
+++ b/src/main/java/cn/hashdata/bireme/pipeline/KafkaPipeLine.java
@@ -1,10 +1,12 @@
package cn.hashdata.bireme.pipeline;
+import java.sql.*;
+import java.util.*;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
+import cn.hashdata.bireme.*;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -14,196 +16,665 @@
import com.codahale.metrics.Timer;
-import cn.hashdata.bireme.AbstractCommitCallback;
-import cn.hashdata.bireme.BiremeException;
-import cn.hashdata.bireme.ChangeSet;
-import cn.hashdata.bireme.CommitCallback;
-import cn.hashdata.bireme.Context;
-import cn.hashdata.bireme.Row;
-import cn.hashdata.bireme.RowSet;
-
/**
* {@code KafkaPipeLine} is a kind of {@code PipeLine} that polls data from Kafka.
*
* @author yuze
- *
*/
public abstract class KafkaPipeLine extends PipeLine {
- private final long POLL_TIMEOUT = 100L;
+ private final long POLL_TIMEOUT = 100L;
+
+ protected KafkaConsumer consumer;
+ protected LinkedBlockingQueue commitCallbacks;
+ public LinkedBlockingQueue loaderConnections;
+
+ public KafkaPipeLine(Context cxt, SourceConfig conf, String myName) {
+ super(cxt, conf, myName);
+ consumer = KafkaPipeLine.createConsumer(conf.server, conf.groupID);
+ commitCallbacks = new LinkedBlockingQueue();
+ }
+
+ @Override
+ public ChangeSet pollChangeSet() throws BiremeException {
+ ConsumerRecords records = null;
+
+ try {
+ records = consumer.poll(POLL_TIMEOUT);
+ } catch (InterruptException e) {
+ }
- protected KafkaConsumer consumer;
- protected LinkedBlockingQueue commitCallbacks;
+ if (cxt.stop || records == null || records.isEmpty()) {
+ return null;
+ }
+
+ KafkaCommitCallback callback = new KafkaCommitCallback();
- public KafkaPipeLine(Context cxt, SourceConfig conf, String myName) {
- super(cxt, conf, myName);
- consumer = KafkaPipeLine.createConsumer(conf.server, conf.groupID);
- commitCallbacks = new LinkedBlockingQueue();
- }
+ if (!commitCallbacks.offer(callback)) {
+ String Message = "Can't add CommitCallback to queue.";
+ throw new BiremeException(Message);
+ }
- @Override
- public ChangeSet pollChangeSet() throws BiremeException {
- ConsumerRecords records = null;
+ stat.recordCount.mark(records.count());
- try {
- records = consumer.poll(POLL_TIMEOUT);
- } catch (InterruptException e) {
+ return packRecords(records, callback);
}
- if (cxt.stop || records == null || records.isEmpty()) {
- return null;
+ @Override
+ public void checkAndCommit() {
+ CommitCallback callback = null;
+
+ while (!commitCallbacks.isEmpty()) {
+ if (commitCallbacks.peek().ready()) {
+ callback = commitCallbacks.remove();
+ } else {
+ break;
+ }
+ }
+
+ if (callback != null) {
+ callback.commit();
+ }
}
- KafkaCommitCallback callback = new KafkaCommitCallback();
+ private ChangeSet packRecords(
+ ConsumerRecords records, KafkaCommitCallback callback) {
+ ChangeSet changeSet = new ChangeSet();
+ changeSet.createdAt = new Date();
+ changeSet.changes = records;
+ changeSet.callback = callback;
- if (!commitCallbacks.offer(callback)) {
- String Message = "Can't add CommitCallback to queue.";
- throw new BiremeException(Message);
+ return changeSet;
}
- stat.recordCount.mark(records.count());
+ /**
+ * Loop through the {@code ChangeSet} and transform each change data into a {@code Row}.
+ *
+ * @author yuze
+ */
+ public abstract class KafkaTransformer extends Transformer {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void fillRowSet(RowSet rowSet) throws BiremeException {
+ CommitCallback callback = changeSet.callback;
+ HashMap offsets = ((KafkaCommitCallback) callback).partitionOffset;
+ Row row = null;
+
+ /**
+ * 循环遍历消费者记录,消费消息
+ */
+ for (ConsumerRecord change :
+ (ConsumerRecords) changeSet.changes) {
+ row = new Row();
+
+ if (!transform(change, row)) {
+ continue;
+ }
+
+ addToRowSet(row, rowSet);
+ offsets.put(change.topic() + "+" + change.partition(), change.offset());
+ callback.setNewestRecord(row.produceTime);
+
+ /**
+ * 如果是ddl
+ */
+ if (row.isDDL) {
+ /**
+ * 判断表是否存在,存在则count = 1, 否则count = 0
+ */
+ boolean tableExists = tableExists(row.table);
+
+ /**
+ * 判断ddl类型
+ */
+ switch (row.ddlType) {
+ /**
+ * 创建表
+ */
+ case CREATE_TABLE:
+ if (!tableExists) {
+ logger.info(createTable(row) ? "表{}创建成功!" : "表{}创建发生异常!", row.table, row.table);
+ }
+ break;
+ /**
+ * 删除表
+ */
+ case DROP_TABLE:
+ if (tableExists) {
+ logger.info(dropTable(row.table) ? "表{}删除成功!" : "表{}删除发生异常!", row.table, row.table);
+ }
+ break;
+ /**
+ * 修改表
+ */
+ case ALTER_TABLE:
+ if (tableExists) {
+ logger.info(alterTable(row, row.table + "_tmp") ? "表{}修改成功!" : "表{}修改发生异常!", row.table, row.table);
+ }
+ break;
+ /**
+ * 创建/删除/修改数据库,直接执行binlog中的sql
+ */
+ case CREATE_DATABASE:
+ if (!existsDatabase(row.database)) {
+ execDataBaseDDL(row);
+ }
+ break;
+ case DROP_DATABASE:
+ case ALTER_DATABASE:
+ if (existsDatabase(row.database)) {
+ execDataBaseDDL(row);
+ }
+ break;
+ }
+
+ }
+ }
+ callback.setNumOfTables(rowSet.rowBucket.size());
+ rowSet.callback = callback;
+ }
- return packRecords(records, callback);
- }
- @Override
- public void checkAndCommit() {
- CommitCallback callback = null;
+ /**
+ * 判断数据库是否存在
+ * @author zhuhai
+ *
+ * @param database 数据库名
+ * @return true--存在
+ */
+ public boolean existsDatabase(String database) {
+ Connection conn = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ }
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ String sql = "SELECT count(1) FROM pg_catalog.pg_database u where u.datname=?";
+ boolean exists = false;
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, database);
+ rs = stmt.executeQuery();
+ rs.next();
+ /**
+ * 判断库是否存在,存在则count = 1, 否则count = 0
+ */
+ exists = rs.getInt(1) > 0;
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ if (null != stmt) {
+ stmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return exists;
+ }
- while (!commitCallbacks.isEmpty()) {
- if (commitCallbacks.peek().ready()) {
- callback = commitCallbacks.remove();
- } else {
- break;
- }
- }
+ /**
+ * 执行对数据库的操作的ddl
+ * @author zhuhai
+ *
+ * @param row 1条消费的记录
+ */
+ public void execDataBaseDDL(Row row) {
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ stmt = conn.createStatement();
+ stmt.execute(row.ddlSQL);
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+
+ /**
+ * 判断表是否存在
+ * @author zhuhai
+ *
+ * @param tableName 表名
+ * @return 表是否存在
+ */
+ public boolean tableExists(String tableName) {
+ Connection conn = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ }
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ String sql = "select count(1) from information_schema.tables where table_schema=? and table_type=? and table_name=?";
+ boolean exists = false;
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, "public");
+ stmt.setString(2, "BASE TABLE");
+ stmt.setString(3, tableName);
+ rs = stmt.executeQuery();
+ rs.next();
+ /**
+ * 判断表是否存在,存在则count = 1, 否则count = 0
+ */
+ exists = rs.getInt(1) > 0;
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ if (null != stmt) {
+ stmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return exists;
+ }
+
+ /**
+ * 删除表
+ * @author zhuhai
+ *
+ * @param tableName 表名
+ * @return true 删除成功
+ */
+ public boolean dropTable(String tableName) {
+ Connection conn = null;
+ String sql = "drop table %s";
+ String finalSQL = String.format(sql, tableName);
+ Statement stmt = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ conn.setAutoCommit(false);
+ stmt = conn.createStatement();
+ stmt.execute(finalSQL);
+ conn.commit();
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ /**
+ * 删除后判断表是否还存在
+ */
+ if (tableExists(tableName)) {
+ return false;
+ } else {
+ return true;
+ }
+ }
- if (callback != null) {
- callback.commit();
- }
- }
-
- private ChangeSet packRecords(
- ConsumerRecords records, KafkaCommitCallback callback) {
- ChangeSet changeSet = new ChangeSet();
- changeSet.createdAt = new Date();
- changeSet.changes = records;
- changeSet.callback = callback;
-
- return changeSet;
- }
-
- /**
- * Loop through the {@code ChangeSet} and transform each change data into a {@code Row}.
- *
- * @author yuze
- *
- */
- public abstract class KafkaTransformer extends Transformer {
- @SuppressWarnings("unchecked")
- @Override
- public void fillRowSet(RowSet rowSet) throws BiremeException {
- CommitCallback callback = changeSet.callback;
- HashMap offsets = ((KafkaCommitCallback) callback).partitionOffset;
- Row row = null;
- for (ConsumerRecord change :
- (ConsumerRecords) changeSet.changes) {
- row = new Row();
+ /**
+ * 创建表
+ * @author zhuhai
+ *
+ * @param row 记录
+ * @return 创建表是否成功
+ */
+ public boolean createTable(Row row) {
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ //conn.setAutoCommit(false);
+ stmt = conn.createStatement();
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ String sql = "create table %s (";
+ String pkSql = "";
+ String finalSQL = String.format(sql, row.table);
+ List columns = row.def.getColumns();
+ for (Column column : columns) {
+ String type = column.getType();
+ String name = column.getName();
+ String newType = "";
+ if ("int".equals(type)) {
+ newType = "int4";
+ } else if ("varchar".equals(type)) {
+ newType = "varchar(100)";
+ } else if ("char".equals(type)) {
+ newType = "char(2)";
+ } else if ("datetime".equalsIgnoreCase(type)) {
+ newType = "timestamp";
+ } else if ("timestamp".equalsIgnoreCase(type)) {
+ newType = "timestamptz";
+ } else if ("longtext".equalsIgnoreCase(type)) {
+ newType = "text";
+ } else if ("double".equalsIgnoreCase(type)) {
+ newType = "double precision";
+ } else {
+ newType = type;
+ }
+ finalSQL = finalSQL + " \"" + name + "\" " + newType + ",";
+ }
+ finalSQL = finalSQL.substring(0, finalSQL.lastIndexOf(",")) + ");";
+ logger.info("sql: " + finalSQL);
+ List primaryKeys = row.def.getPrimary_key();
+ if (primaryKeys != null && primaryKeys.size() > 0) {
+ String pk = "";
+ //设置主键
+ for (String primaryKey : primaryKeys) {
+ if (StringUtils.isEmpty(pk)) {
+ pk = primaryKey;
+ } else {
+ pk = pk + "," + primaryKey + "";
+ }
+ }
+ pkSql = "ALTER TABLE \"" + row.table + "\" ADD CONSTRAINT \"" + row.table + "_pkey\"" + " PRIMARY KEY (" + pk + ")";
+ logger.info("alter_pk: " + pkSql);
+ }
+
+ try {
+ stmt.execute(finalSQL);
+ if (StringUtils.isNotEmpty(pkSql)) {
+ stmt.execute(pkSql);
+ }
+ //conn.commit();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != stmt) {
+ stmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return tableExists(row.table);
+ }
+
+ /**
+ * 判断两个表是否数据同步
+ * @author zhuhai
+ *
+ * @param table1 第一个表
+ * @param table2 第二个表
+ * @return 是否同步
+ */
+ public boolean isSyncData(String table1, String table2) {
+ boolean isSync = false;
+ Connection conn = null;
+ Statement pstmt = null;
+ ResultSet rs = null;
+ String sql = String.format("select * from (select count(1) from %s) cnt1, (select count(1) from %s) cnt2", table1, table2);
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ pstmt = conn.createStatement();
+ rs = pstmt.executeQuery(sql);
+ if (rs.next()) {
+ if (rs.getInt(1) == rs.getInt(2)) {
+ isSync = true;
+ }
+ }
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ if (null != rs) {
+ rs.close();
+ }
+ if (null != pstmt) {
+ pstmt.close();
+ }
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return isSync;
+ }
- if (!transform(change, row)) {
- continue;
+
+ /**
+ * 修改表
+ * 修改的策略是: 先将数据被分到临时表;然后删除表;最后再创建表并插入数据
+ * @author zhuhai
+ *
+ * @param row 记录
+ * @param tmpTable 临时表
+ * @return true: 修改成功
+ */
+ public boolean alterTable(Row row, String tmpTable) {
+ boolean isAltered = false;
+ Connection conn = null;
+ try {
+ conn = BiremeUtility.jdbcConn(cxt.conf.targetDatabase);
+ conn.setAutoCommit(false);
+ } catch (BiremeException ex) {
+ ex.printStackTrace();
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ Statement stmt = null;
+ Statement insertData = null;
+ Statement delTemp = null;
+ String duplicate = String.format("select * into %s from %s", tmpTable, row.table);
+ String insert = String.format("insert into %s select * from %s", row.table, tmpTable);
+ String delTempTable = String.format("drop table %s", tmpTable);
+ try {
+ boolean tmpExists = false;
+ if (tableExists(tmpTable)) {
+ if (dropTable(tmpTable)) {
+ tmpExists = false;
+ } else {
+ tmpExists = true;
+ }
+ }
+
+ /**
+ * 1. 备份数据
+ */
+ /**
+ * 将数据被分到临时表
+ */
+ stmt = conn.createStatement();
+ if (!tmpExists) {
+ stmt.execute(duplicate);
+ conn.commit();
+ }
+
+ /**
+ * 2. 删除表
+ * 查询数据是否已经同步
+ */
+ boolean isDeled = false;
+ if (isSyncData(row.table, tmpTable)) {
+ if (tableExists(row.table)) {
+ isDeled = dropTable(row.table);
+ }
+ }
+
+ /**
+ * 3. 创建表并插入临时数据
+ */
+ if (isDeled) {
+ boolean isCreated = createTable(row);
+ if (isCreated) {
+ insertData = conn.createStatement();
+ insertData.execute(insert);
+ conn.commit();
+ if (isSyncData(row.table, tmpTable)) {
+ isAltered = true;
+ /**
+ * 4. 删除临时表
+ */
+ delTemp = conn.createStatement();
+ delTemp.execute(delTempTable);
+ conn.commit();
+ if (tableExists(tmpTable)) {
+ logger.error("临时表\" {} \" 删除失败!", tmpTable);
+ }
+ }
+ }
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ closeStatement(stmt, insertData, delTemp);
+ if (null != conn) {
+ conn.close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return isAltered;
}
- addToRowSet(row, rowSet);
- offsets.put(change.topic() + "+" + change.partition(), change.offset());
- callback.setNewestRecord(row.produceTime);
- }
+ /**
+ * 关闭statement
+ * @author zhuhai
+ *
+ * @param statments statement数组
+ */
+ public void closeStatement(Statement... statments) {
+ for (int i = 0; i < statments.length; i++) {
+ try {
+ if (null != statments[i]) {
+ statments[i].close();
+ }
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
- callback.setNumOfTables(rowSet.rowBucket.size());
- rowSet.callback = callback;
+ /**
+ * Transform the change data into a {@code Row}.
+ *
+ * @param change the change data
+ * @param row an empty {@code Row} to store the result.
+ * @return {@code true} if transform the change data successfully, {@code false} it the change
+ * data is null or filtered
+ * @throws BiremeException when can not get the field
+ */
+ public abstract boolean transform(ConsumerRecord change, Row row)
+ throws BiremeException;
}
/**
- * Transform the change data into a {@code Row}.
+ * {@code KafkaCommitCallback} is used to trace a {@code ChangeSet} polled from Kafka. After the
+ * change data has been applied, commit the offset to Kafka.
*
- * @param change the change data
- * @param row an empty {@code Row} to store the result.
- * @return {@code true} if transform the change data successfully, {@code false} it the change
- * data is null or filtered
- * @throws BiremeException when can not get the field
+ * @author yuze
*/
- public abstract boolean transform(ConsumerRecord change, Row row)
- throws BiremeException;
- }
-
- /**
- * {@code KafkaCommitCallback} is used to trace a {@code ChangeSet} polled from Kafka. After the
- * change data has been applied, commit the offset to Kafka.
- *
- * @author yuze
- *
- */
- public class KafkaCommitCallback extends AbstractCommitCallback {
- public HashMap partitionOffset;
- private Timer.Context timerCTX;
- private Date start;
-
- public KafkaCommitCallback() {
- this.partitionOffset = new HashMap();
-
- // record the time being created
- timerCTX = stat.avgDelay.time();
- start = new Date();
- }
+ public class KafkaCommitCallback extends AbstractCommitCallback {
+ public HashMap partitionOffset;
+ private Timer.Context timerCTX;
+ private Date start;
- @Override
- public void commit() {
- HashMap offsets =
- new HashMap();
+ public KafkaCommitCallback() {
+ this.partitionOffset = new HashMap();
+
+ // record the time being created
+ timerCTX = stat.avgDelay.time();
+ start = new Date();
+ }
+
+ @Override
+ public void commit() {
+ HashMap offsets =
+ new HashMap();
+
+ partitionOffset.forEach((key, value) -> {
+ String topic = key.split("\\+")[0];
+ int partition = Integer.valueOf(key.split("\\+")[1]);
+ offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(value + 1));
+ });
+
+ consumer.commitSync(offsets);
+ committed.set(true);
+ partitionOffset.clear();
- partitionOffset.forEach((key, value) -> {
- String topic = key.split("\\+")[0];
- int partition = Integer.valueOf(key.split("\\+")[1]);
- offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(value + 1));
- });
+ // record the time being committed
+ timerCTX.stop();
- consumer.commitSync(offsets);
- committed.set(true);
- partitionOffset.clear();
+ stat.newestCompleted = newestRecord;
+ stat.delay = new Date().getTime() - start.getTime();
+ }
- // record the time being committed
- timerCTX.stop();
+ @Override
+ public void destory() {
+ super.destory();
+ partitionOffset.clear();
+ partitionOffset = null;
+ timerCTX = null;
+ start = null;
+ }
- stat.newestCompleted = newestRecord;
- stat.delay = new Date().getTime() - start.getTime();
}
- @Override
- public void destory() {
- super.destory();
- partitionOffset.clear();
- partitionOffset = null;
- timerCTX = null;
- start = null;
+ /**
+ * Create a new KafkaConsumer, specify the server's ip and port, and groupID.
+ *
+ * @param server ip and port for Kafka server
+ * @param groupID consumer's group id
+ * @return the consumer
+ */
+ public static KafkaConsumer createConsumer(String server, String groupID) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", server);
+ props.put("group.id", groupID);
+ props.put("enable.auto.commit", false);
+ props.put("session.timeout.ms", 60000);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("auto.offset.reset", "earliest");
+ return new KafkaConsumer(props);
}
- }
-
- /**
- * Create a new KafkaConsumer, specify the server's ip and port, and groupID.
- *
- * @param server ip and port for Kafka server
- * @param groupID consumer's group id
- * @return the consumer
- */
- public static KafkaConsumer createConsumer(String server, String groupID) {
- Properties props = new Properties();
- props.put("bootstrap.servers", server);
- props.put("group.id", groupID);
- props.put("enable.auto.commit", false);
- props.put("session.timeout.ms", 60000);
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "earliest");
- return new KafkaConsumer(props);
- }
}
diff --git a/src/main/java/cn/hashdata/bireme/pipeline/MaxwellPipeLine.java b/src/main/java/cn/hashdata/bireme/pipeline/MaxwellPipeLine.java
index 7d5ebdf..0e9ec2d 100644
--- a/src/main/java/cn/hashdata/bireme/pipeline/MaxwellPipeLine.java
+++ b/src/main/java/cn/hashdata/bireme/pipeline/MaxwellPipeLine.java
@@ -1,172 +1,282 @@
package cn.hashdata.bireme.pipeline;
-import java.util.Arrays;
-import java.util.HashMap;
-
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+import cn.hashdata.bireme.*;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.binary.Base64;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
-import cn.hashdata.bireme.BiremeException;
-import cn.hashdata.bireme.BiremeUtility;
-import cn.hashdata.bireme.Context;
-import cn.hashdata.bireme.Record;
-import cn.hashdata.bireme.Row;
-import cn.hashdata.bireme.Table;
import cn.hashdata.bireme.Row.RowType;
+import cn.hashdata.bireme.Row.DDLType;
/**
* {@code MaxwellPipeLine} is a kind of {@code KafkaPipeLine} whose change data coming from Maxwell.
*
* @author yuze
- *
*/
public class MaxwellPipeLine extends KafkaPipeLine {
- public MaxwellPipeLine(Context cxt, SourceConfig conf, int id) {
- super(cxt, conf, "Maxwell-" + conf.name + "-" + conf.topic + "-" + id);
- consumer.subscribe(Arrays.asList(conf.topic));
- logger = LogManager.getLogger("Bireme." + myName);
- logger.info("Create new Maxwell Pipeline. Name: {}", myName);
- }
-
- @Override
- public Transformer createTransformer() {
- return new MaxwellTransformer();
- }
-
- /**
- * {@code MaxwellChangeTransformer} is a type of {@code Transformer}. It is used to transform data
- * to {@code Row} from Maxwell data source.
- *
- * @author yuze
- *
- */
- class MaxwellTransformer extends KafkaTransformer {
- HashMap tableMap;
-
- public MaxwellTransformer() {
- super();
- tableMap = conf.tableMap;
- }
-
- private String getMappedTableName(MaxwellRecord record) {
- return cxt.tableMap.get(record.dataSource + "." + record.database + "." + record.table);
- }
-
- private String getOriginTableName(MaxwellRecord record) {
- return record.dataSource + "." + record.database + "." + record.table;
- }
-
- private boolean filter(MaxwellRecord record) {
- String fullTableName = record.dataSource + "." + record.database + "." + record.table;
-
- if (!tableMap.containsKey(fullTableName)) {
- return true;
- }
-
- return false;
+ public MaxwellPipeLine(Context cxt, SourceConfig conf, int id) {
+ super(cxt, conf, "Maxwell-" + conf.name + "-" + conf.topic + "-" + id);
+ consumer.subscribe(Arrays.asList(conf.topic));
+ logger = LogManager.getLogger("Bireme." + myName);
+ logger.info("Create new Maxwell Pipeline. Name: {}", myName);
}
@Override
- protected byte[] decodeToBinary(String data) {
- byte[] decoded = null;
- decoded = Base64.decodeBase64(data);
- return decoded;
+ public Transformer createTransformer() {
+ return new MaxwellTransformer();
}
- @Override
- protected String decodeToBit(String data, int precision) {
- String binaryStr = Integer.toBinaryString(Integer.valueOf(data));
- return String.format("%" + precision + "s", binaryStr).replace(' ', '0');
- }
-
- @Override
- public boolean transform(ConsumerRecord change, Row row)
- throws BiremeException {
- MaxwellRecord record = new MaxwellRecord(change.value());
-
- if (filter(record)) {
- return false;
- }
+ /**
+ * {@code MaxwellChangeTransformer} is a type of {@code Transformer}. It is used to transform data
+ * to {@code Row} from Maxwell data source.
+ *
+ * @author yuze
+ */
+ class MaxwellTransformer extends KafkaTransformer {
+ HashMap tableMap;
+
+ public MaxwellTransformer() {
+ super();
+ tableMap = conf.tableMap;
+ }
- Table table = cxt.tablesInfo.get(getMappedTableName(record));
+ private String getMappedTableName(MaxwellRecord record) {
+ return cxt.tableMap.get(record.dataSource + "." + record.database + "." + record.table);
+ }
- row.type = record.type;
- row.produceTime = record.produceTime;
- row.originTable = getOriginTableName(record);
- row.mappedTable = getMappedTableName(record);
- row.keys = formatColumns(record, table, table.keyNames, false);
+ private String getOriginTableName(MaxwellRecord record) {
+ return record.dataSource + "." + record.database + "." + record.table;
+ }
- if (row.type == RowType.INSERT || row.type == RowType.UPDATE) {
- row.tuple = formatColumns(record, table, table.columnName, false);
- }
+ private boolean filter(MaxwellRecord record) {
+ String fullTableName = record.dataSource + "." + record.database + "." + record.table;
- if (row.type == RowType.UPDATE) {
- row.oldKeys = formatColumns(record, table, table.keyNames, true);
+ if (!tableMap.containsKey(fullTableName)) {
+ return true;
+ }
- if (row.keys.equals(row.oldKeys)) {
- row.oldKeys = null;
+ return false;
}
- }
- return true;
- }
-
- class MaxwellRecord implements Record {
- public String dataSource;
- public String database;
- public String table;
- public Long produceTime;
- public RowType type;
- public JsonObject data;
- public JsonObject old;
-
- public MaxwellRecord(String changeValue) {
- JsonParser jsonParser = new JsonParser();
- JsonObject value = (JsonObject) jsonParser.parse(changeValue);
-
- this.dataSource = getPipeLineName();
- this.database = value.get("database").getAsString();
- this.table = value.get("table").getAsString();
- this.produceTime = value.get("ts").getAsLong() * 1000;
- this.data = value.get("data").getAsJsonObject();
-
- if (value.has("old") && !value.get("old").isJsonNull()) {
- this.old = value.get("old").getAsJsonObject();
+ @Override
+ protected byte[] decodeToBinary(String data) {
+ byte[] decoded = null;
+ decoded = Base64.decodeBase64(data);
+ return decoded;
}
- switch (value.get("type").getAsString()) {
- case "insert":
- type = RowType.INSERT;
- break;
-
- case "update":
- type = RowType.UPDATE;
- break;
-
- case "delete":
- type = RowType.DELETE;
- break;
+ @Override
+ protected String decodeToBit(String data, int precision) {
+ String binaryStr = Integer.toBinaryString(Integer.valueOf(data));
+ return String.format("%" + precision + "s", binaryStr).replace(' ', '0');
}
- }
-
- @Override
- public String getField(String fieldName, boolean oldValue) throws BiremeException {
- String field = null;
-
- if (oldValue) {
- try {
- field = BiremeUtility.jsonGetIgnoreCase(old, fieldName);
- return field;
- } catch (BiremeException ignore) {
- }
+
+ @Override
+ public boolean transform(ConsumerRecord change, Row row)
+ throws BiremeException {
+ MaxwellRecord record = new MaxwellRecord(change.value());
+
+ /**
+ * ddlType为空,即非ddl才需要过滤
+ */
+ if (null == record.ddlType) {
+ if (filter(record)) {
+ return false;
+ }
+ }
+
+ row.type = record.type;
+ row.ddlType = record.ddlType;
+ row.produceTime = record.produceTime;
+ row.originTable = getOriginTableName(record);
+ row.mappedTable = getMappedTableName(record);
+ row.ddlSQL = record.ddlSQL;
+ row.def = record.def;
+ row.database = record.database;
+ row.table = record.table;
+ row.isDDL = record.isDDL;
+
+ if (row.ddlType == null || row.ddlSQL == null) {
+ Table table = cxt.tablesInfo.get(getMappedTableName(record));
+ row.keys = formatColumns(record, table, table.keyNames, false);
+
+ /**
+ * 如果是插入或更新
+ */
+ if (row.type == RowType.INSERT || row.type == RowType.UPDATE) {
+ row.tuple = formatColumns(record, table, table.columnName, false);
+ }
+
+ /**
+ * 如果是更新还需要old信息
+ */
+ if (row.type == RowType.UPDATE) {
+ row.oldKeys = formatColumns(record, table, table.keyNames, true);
+
+ if (row.keys.equals(row.oldKeys)) {
+ row.oldKeys = null;
+ }
+ }
+ }
+ return true;
}
- return BiremeUtility.jsonGetIgnoreCase(data, fieldName);
- }
+ class MaxwellRecord implements Record {
+ public String dataSource;
+ public String database;
+ public String table;
+ public Long produceTime;
+ public RowType type;
+ public JSONObject data;
+ public JSONObject old;
+
+ /**
+ * ddl 数据定义
+ *
+ * @param changeValue
+ */
+ public DDLType ddlType;
+ public String ddlSQL;
+ public TableSchema def;
+ public boolean isDDL = false;
+
+ /**
+ * 构造函数
+ *
+ * @param changeValue
+ */
+ public MaxwellRecord(String changeValue) {
+ JSONObject value = JSONObject.parseObject(changeValue);
+
+ this.dataSource = getPipeLineName();
+ this.database = value.getString("database");
+ this.table = value.getString("table");
+ this.produceTime = value.getLong("ts") * 1000;
+ /**
+ * 当data不存在时,应该赋一个空json,否则会报空指针异常
+ */
+ if (value.containsKey("data")) {
+ this.data = value.getJSONObject("data");
+ } else {
+ this.data = new JSONObject();
+ }
+
+ if (value.containsKey("old") && !value.getJSONObject("old").isEmpty()) {
+ this.old = value.getJSONObject("old");
+ }
+
+ /**
+ * 获取ddlSQL
+ */
+ if (value.containsKey("sql") && !value.getString("sql").isEmpty()) {
+ this.ddlSQL = value.getString("sql");
+ }
+ if (value.containsKey("def") && !value.getJSONObject("def").isEmpty()) {
+ /**
+ * 获取表字段(列)
+ */
+ JSONObject defObject = value.getJSONObject("def");
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setDatabase(defObject.getString("database"));
+ tableSchema.setTable(defObject.getString("table"));
+ tableSchema.setCharset(defObject.getString("charset"));
+ JSONArray columnsJson = defObject.getJSONArray("columns");
+ List primaryKeysList = new ArrayList();
+ if (defObject.containsKey("primary-key")) {
+ String primaryKeyStr = defObject.get("primary-key").toString();
+ logger.info(this.table+"-->主键: " + primaryKeyStr);
+ if (!primaryKeyStr.isEmpty()) {
+ if (primaryKeyStr.contains("[")) {
+ primaryKeyStr = primaryKeyStr.replaceAll("\\[|\\]", "");
+ if (primaryKeyStr.contains(",")) {
+ String[] primaryKeyArr = primaryKeyStr.split(",");
+ for (int i = 0; i < primaryKeyArr.length; i++) {
+ primaryKeysList.add(primaryKeyArr[i]);
+ }
+ } else {
+ primaryKeysList.add(primaryKeyStr);
+ }
+ } else {
+ primaryKeysList.add(primaryKeyStr);
+ }
+ }
+ }
+ List columns = new ArrayList();
+ try {
+ for (Object objetc : columnsJson) {
+ Column column = new Column();
+ BeanUtils.copyProperties(column, objetc);
+ columns.add(column);
+ }
+ } catch (IllegalAccessException ex) {
+ ex.printStackTrace();
+ } catch (InvocationTargetException ex) {
+ ex.printStackTrace();
+ }
+ tableSchema.setColumns(columns);
+ tableSchema.setPrimary_key(primaryKeysList);
+ this.def = tableSchema;
+ }
+
+ switch (value.getString("type")) {
+ case "insert":
+ type = RowType.INSERT;
+ break;
+ case "update":
+ type = RowType.UPDATE;
+ break;
+ case "delete":
+ type = RowType.DELETE;
+ break;
+ case "database-create":
+ isDDL = true;
+ ddlType = DDLType.CREATE_DATABASE;
+ break;
+ case "database-drop":
+ isDDL = true;
+ ddlType = DDLType.DROP_DATABASE;
+ break;
+ case "database-alter":
+ isDDL = true;
+ ddlType = DDLType.ALTER_DATABASE;
+ break;
+ case "table-create":
+ isDDL = true;
+ ddlType = DDLType.CREATE_TABLE;
+ break;
+ case "table-drop":
+ isDDL = true;
+ ddlType = DDLType.DROP_TABLE;
+ break;
+ case "table-alter":
+ isDDL = true;
+ ddlType = DDLType.ALTER_TABLE;
+ break;
+ }
+ }
+
+ @Override
+ public String getField(String fieldName, boolean oldValue) throws BiremeException {
+ String field = null;
+
+ if (oldValue) {
+ try {
+ field = BiremeUtility.jsonGetIgnoreCase(old, fieldName);
+ return field;
+ } catch (BiremeException ignore) {
+ }
+ }
+
+ return BiremeUtility.jsonGetIgnoreCase(data, fieldName);
+ }
+ }
}
- }
}
diff --git a/src/main/java/cn/hashdata/bireme/pipeline/PipeLine.java b/src/main/java/cn/hashdata/bireme/pipeline/PipeLine.java
index 558586b..5b8c209 100644
--- a/src/main/java/cn/hashdata/bireme/pipeline/PipeLine.java
+++ b/src/main/java/cn/hashdata/bireme/pipeline/PipeLine.java
@@ -34,473 +34,471 @@
*
*
* @author yuze
- *
*/
public abstract class PipeLine implements Callable {
- public enum PipeLineState { NORMAL, ERROR }
-
- public Logger logger;
-
- public String myName;
- public volatile PipeLineState state;
- public Exception e;
- public PipeLineStat stat;
-
- public Context cxt;
- public SourceConfig conf;
-
- public LinkedBlockingQueue> transResult;
- private LinkedList localTransformer;
+ public enum PipeLineState {NORMAL, ERROR}
- private Dispatcher dispatcher;
+ public Logger logger;
- public ConcurrentHashMap cache;
+ public String myName;
+ public volatile PipeLineState state;
+ public Exception e;
+ public PipeLineStat stat;
- public PipeLine(Context cxt, SourceConfig conf, String myName) {
- this.myName = myName;
- this.state = PipeLineState.NORMAL;
- this.e = null;
+ public Context cxt;
+ public SourceConfig conf;
- this.cxt = cxt;
- this.conf = conf;
+ public LinkedBlockingQueue> transResult;
+ private LinkedList localTransformer;
- int queueSize = cxt.conf.transform_queue_size;
+ private Dispatcher dispatcher;
- transResult = new LinkedBlockingQueue>(queueSize);
- localTransformer = new LinkedList();
+ public ConcurrentHashMap cache;
- cache = new ConcurrentHashMap();
+ public PipeLine(Context cxt, SourceConfig conf, String myName) {
+ this.myName = myName;
+ this.state = PipeLineState.NORMAL;
+ this.e = null;
- dispatcher = new Dispatcher(cxt, this);
+ this.cxt = cxt;
+ this.conf = conf;
- for (int i = 0; i < queueSize; i++) {
- localTransformer.add(createTransformer());
- }
+ int queueSize = cxt.conf.transform_queue_size;
- // initialize statistics
- this.stat = new PipeLineStat(this);
- }
+ transResult = new LinkedBlockingQueue>(queueSize);
+ localTransformer = new LinkedList();
- @Override
- public PipeLine call() {
- try {
- executePipeline();
- } catch (Exception e) {
- state = PipeLineState.ERROR;
- this.e = e;
+ cache = new ConcurrentHashMap();
- logger.error("Execute Pipeline failed: {}", e.getMessage());
- logger.error("Stack Trace: ", e);
- }
+ dispatcher = new Dispatcher(cxt, this);
- return this;
- }
+ for (int i = 0; i < queueSize; i++) {
+ localTransformer.add(createTransformer());
+ }
- private PipeLine executePipeline() {
- // Poll data and start transformer
- if (transData() == false) {
- return this;
+ // initialize statistics
+ this.stat = new PipeLineStat(this);
}
- // Start dispatcher, only one dispatcher for each pipeline
- if (startDispatch() == false) {
- return this;
- }
+ @Override
+ public PipeLine call() {
+ try {
+ executePipeline();
+ } catch (Exception e) {
+ state = PipeLineState.ERROR;
+ this.e = e;
+
+ logger.error("Execute Pipeline failed: {}", e.getMessage());
+ logger.error("Stack Trace: ", e);
+ }
- // Start merger
- if (startMerge() == false) {
- return this;
+ return this;
}
- checkAndCommit(); // Commit result
- return this;
- }
-
- private boolean transData() {
- while (transResult.remainingCapacity() != 0) {
- ChangeSet changeSet = null;
-
- try {
- changeSet = pollChangeSet();
- } catch (BiremeException e) {
- state = PipeLineState.ERROR;
- this.e = e;
-
- logger.error("Poll change set failed. Message: {}", e.getMessage());
- logger.error("Stack Trace: ", e);
+ private PipeLine executePipeline() {
+ // Poll data and start transformer
+ if (transData() == false) {
+ return this;
+ }
- return false;
- }
+ // Start dispatcher, only one dispatcher for each pipeline
+ if (startDispatch() == false) {
+ return this;
+ }
- if (changeSet == null) {
- break;
- }
+ // Start merger
+ if (startMerge() == false) {
+ return this;
+ }
- Transformer trans = localTransformer.remove();
- trans.setChangeSet(changeSet);
- startTransform(trans);
- localTransformer.add(trans);
+ checkAndCommit(); // Commit result
+ return this;
}
- return true;
- }
+ private boolean transData() {
+ while (transResult.remainingCapacity() != 0) {
+ ChangeSet changeSet = null;
- private boolean startDispatch() {
- try {
- dispatcher.dispatch();
- } catch (BiremeException e) {
- state = PipeLineState.ERROR;
- this.e = e;
+ try {
+ changeSet = pollChangeSet();
+ } catch (BiremeException e) {
+ state = PipeLineState.ERROR;
+ this.e = e;
- logger.error("Dispatch failed. Message: {}", e.getMessage());
- logger.error("Stack Trace: ", e);
+ logger.error("Poll change set failed. Message: {}", e.getMessage());
+ logger.error("Stack Trace: ", e);
- return false;
+ return false;
+ }
- } catch (InterruptedException e) {
- state = PipeLineState.ERROR;
- this.e = new BiremeException("Dispatcher failed, be interrupted", e);
+ if (changeSet == null) {
+ break;
+ }
- logger.info("Interrupted when getting transform result. Message: {}.", e.getMessage());
- logger.info("Stack Trace: ", e);
+ Transformer trans = localTransformer.remove();
+ trans.setChangeSet(changeSet);
+ startTransform(trans);
+ localTransformer.add(trans);
+ }
- return false;
+ return true;
}
- return true;
- }
-
- private boolean startMerge() {
- for (RowCache rowCache : cache.values()) {
- if (rowCache.shouldMerge()) {
- rowCache.startMerge();
- }
-
- try {
- rowCache.startLoad();
+ private boolean startDispatch() {
+ try {
+ dispatcher.dispatch();
+ } catch (BiremeException e) {
+ state = PipeLineState.ERROR;
+ this.e = e;
- } catch (BiremeException e) {
- state = PipeLineState.ERROR;
- this.e = e;
+ logger.error("Dispatch failed. Message: {}", e.getMessage());
+ logger.error("Stack Trace: ", e);
- logger.info("Loader for {} failed. Message: {}.", rowCache.tableName, e.getMessage());
- logger.info("Stack Trace: ", e);
+ return false;
- return false;
+ } catch (InterruptedException e) {
+ state = PipeLineState.ERROR;
+ this.e = new BiremeException("Dispatcher failed, be interrupted", e);
- } catch (InterruptedException e) {
- state = PipeLineState.ERROR;
- this.e = new BiremeException("Get Future failed, be interrupted", e);
+ logger.info("Interrupted when getting transform result. Message: {}.", e.getMessage());
+ logger.info("Stack Trace: ", e);
- logger.info("Interrupted when getting loader result for {}. Message: {}.",
- rowCache.tableName, e.getMessage());
- logger.info("Stack Trace: ", e);
-
- return false;
- }
- }
+ return false;
+ }
- return true;
- }
-
- /**
- * Poll a set of change data from source and pack it to {@link ChangeSet}.
- *
- * @return a packed change set
- * @throws BiremeException Exceptions when poll data from source
- */
- public abstract ChangeSet pollChangeSet() throws BiremeException;
-
- /**
- * Check whether the loading operation is complete. If true, commit it.
- *
- */
- public abstract void checkAndCommit();
-
- /**
- * Create a new {@link Transformer} to work parallel.
- *
- * @return a new {@code Transformer}
- */
- public abstract Transformer createTransformer();
-
- private void startTransform(Transformer trans) {
- ExecutorService transformerPool = cxt.transformerPool;
- Future result = transformerPool.submit(trans);
- transResult.add(result);
- }
-
- /**
- * Get the unique name for the {@code PipeLine}.
- *
- * @return the name for the {@code PipeLine}
- */
- public String getPipeLineName() {
- return conf.name;
- }
-
- /**
- * {@code Transformer} convert a group of change data to unified form {@link Row}.
- *
- * @author yuze
- *
- */
- public abstract class Transformer implements Callable {
- private static final char FIELD_DELIMITER = '|';
- private static final char NEWLINE = '\n';
- private static final char QUOTE = '"';
- private static final char ESCAPE = '\\';
-
- public ChangeSet changeSet;
- public StringBuilder tupleStringBuilder;
- public StringBuilder fieldStringBuilder;
-
- public Transformer() {
- tupleStringBuilder = new StringBuilder();
- fieldStringBuilder = new StringBuilder();
+ return true;
}
- /**
- * Borrow an empty {@code RowSet} and write the data acquired from {@code ChangeSet} to the
- * {@code RowSet}. Finally, return the filled {@code RowSet}.
- *
- * @throws BiremeException when unable to transform the recoed
- */
- @Override
- public RowSet call() throws BiremeException {
- RowSet rowSet = new RowSet();
-
- fillRowSet(rowSet);
+ private boolean startMerge() {
+ for (RowCache rowCache : cache.values()) {
+ if (rowCache.shouldMerge()) {
+ rowCache.startMerge();
+ }
- changeSet.destory();
- changeSet = null;
+ try {
+ rowCache.startLoad();
- return rowSet;
- }
+ } catch (BiremeException e) {
+ state = PipeLineState.ERROR;
+ this.e = e;
- /**
- * Format the change data into csv tuple, which is then loaded to database by COPY.
- *
- * @param record contain change data polled by {@code Provider}.
- * @param table metadata of the target table
- * @param columns the indexes of columns to assemble a csv tuple
- * @param oldValue only for update operation when primary key was updated, we need to get the
- * old key and delete the old tuple
- * @return the csv tuple in string
- * @throws BiremeException when can not get the field value
- */
- protected String formatColumns(Record record, Table table, ArrayList columns,
- boolean oldValue) throws BiremeException {
- tupleStringBuilder.setLength(0);
-
- for (int i = 0; i < columns.size(); ++i) {
- String columnName = columns.get(i);
- int sqlType = table.columnType.get(columnName);
- String data = null;
-
- data = record.getField(columnName, oldValue);
- if (data != null) {
- switch (sqlType) {
- case Types.CHAR:
- case Types.NCHAR:
- case Types.VARCHAR:
- case Types.LONGVARCHAR:
- case Types.NVARCHAR:
- case Types.LONGNVARCHAR: {
- tupleStringBuilder.append(QUOTE);
- tupleStringBuilder.append(escapeString(data));
- tupleStringBuilder.append(QUOTE);
-
- break;
- }
-
- case Types.BINARY:
- case Types.BLOB:
- case Types.CLOB:
- case Types.LONGVARBINARY:
- case Types.NCLOB:
- case Types.VARBINARY: {
- byte[] decoded = null;
- decoded = decodeToBinary(data);
- tupleStringBuilder.append(escapeBinary(decoded));
- break;
- }
+ logger.info("Loader for {} failed. Message: {}.", rowCache.tableName, e.getMessage());
+ logger.info("Stack Trace: ", e);
- case Types.BIT: {
- int precision = table.columnPrecision.get(columnName);
- tupleStringBuilder.append(decodeToBit(data, precision));
- break;
- }
+ return false;
- case Types.DATE:
- case Types.TIME:
- case Types.TIMESTAMP: {
- int scale = table.columnScale.get(columnName);
- String time = decodeToTime(data, sqlType, scale);
- tupleStringBuilder.append(time);
- break;
- }
+ } catch (InterruptedException e) {
+ state = PipeLineState.ERROR;
+ this.e = new BiremeException("Get Future failed, be interrupted", e);
- case Types.DECIMAL:
- case Types.NUMERIC: {
- int scale = table.columnScale.get(columnName);
- String numeric = decodeToNumeric(data, sqlType, scale);
- tupleStringBuilder.append(numeric);
- break;
- }
+ logger.info("Interrupted when getting loader result for {}. Message: {}.",
+ rowCache.tableName, e.getMessage());
+ logger.info("Stack Trace: ", e);
- default: {
- tupleStringBuilder.append(data);
- break;
+ return false;
}
- }
}
- if (i + 1 < columns.size()) {
- tupleStringBuilder.append(FIELD_DELIMITER);
- }
- }
- tupleStringBuilder.append(NEWLINE);
- return tupleStringBuilder.toString();
+ return true;
}
/**
- * For binary type, {@code Transformer} need to decode the extracted string and transform it to
- * origin binary.
+ * Poll a set of change data from source and pack it to {@link ChangeSet}.
*
- * @param data the encoded string
- * @return the array of byte, decode result
+ * @return a packed change set
+ * @throws BiremeException Exceptions when poll data from source
*/
- protected abstract byte[] decodeToBinary(String data);
+ public abstract ChangeSet pollChangeSet() throws BiremeException;
/**
- * For bit type, {@code Transformer} need to decode the extracted string and transform it to
- * origin bit.
- *
- * @param data the encoded string
- * @param precision the length of the bit field, acquired from the table's metadata
- * @return the string of 1 or 0
+ * Check whether the loading operation is complete. If true, commit it.
*/
- protected abstract String decodeToBit(String data, int precision);
+ public abstract void checkAndCommit();
/**
- * For Date/Time type, {@code Transformer} need to decode the extracted string and transform it
- * to origin Date/Time string.
+ * Create a new {@link Transformer} to work parallel.
*
- * @param data the encoded string from provider
- * @param sqlType particular type of this field, such as Time, Date
- * @param precision specifies the number of fractional digits retained in the seconds field
- * @return the Date/Time format
+ * @return a new {@code Transformer}
*/
- protected String decodeToTime(String data, int sqlType, int precision) {
- return data;
- };
+ public abstract Transformer createTransformer();
+
+ private void startTransform(Transformer trans) {
+ ExecutorService transformerPool = cxt.transformerPool;
+ Future result = transformerPool.submit(trans);
+ transResult.add(result);
+ }
/**
- * For Numeric type, {@code Transformer} need to decode the extracted string and transform it to
- * origin Numeric in String.
+ * Get the unique name for the {@code PipeLine}.
*
- * @param data the value from provider
- * @param sqlType particular type of this field
- * @param precision the count of decimal digits in the fractional part
- * @return the numeric number in String
+ * @return the name for the {@code PipeLine}
*/
- protected String decodeToNumeric(String data, int sqlType, int precision) {
- return data;
- };
+ public String getPipeLineName() {
+ return conf.name;
+ }
/**
- * Add escape character to a data string.
+ * {@code Transformer} convert a group of change data to unified form {@link Row}.
*
- * @param data the origin string
- * @return the modified string
+ * @author yuze
*/
- protected String escapeString(String data) {
- fieldStringBuilder.setLength(0);
-
- for (int i = 0; i < data.length(); ++i) {
- char c = data.charAt(i);
-
- switch (c) {
- case 0x00:
- logger.warn("illegal character 0x00, deleted.");
- continue;
- case QUOTE:
- case ESCAPE:
- fieldStringBuilder.append(ESCAPE);
+ public abstract class Transformer implements Callable {
+ private static final char FIELD_DELIMITER = '|';
+ private static final char NEWLINE = '\n';
+ private static final char QUOTE = '"';
+ private static final char ESCAPE = '\\';
+
+ public ChangeSet changeSet;
+ public StringBuilder tupleStringBuilder;
+ public StringBuilder fieldStringBuilder;
+
+ public Transformer() {
+ tupleStringBuilder = new StringBuilder();
+ fieldStringBuilder = new StringBuilder();
}
- fieldStringBuilder.append(c);
- }
+ /**
+ * Borrow an empty {@code RowSet} and write the data acquired from {@code ChangeSet} to the
+ * {@code RowSet}. Finally, return the filled {@code RowSet}.
+ *
+ * @throws BiremeException when unable to transform the recoed
+ */
+ @Override
+ public RowSet call() throws BiremeException {
+ RowSet rowSet = new RowSet();
- return fieldStringBuilder.toString();
- }
+ fillRowSet(rowSet);
- /**
- * Encode the binary data into string for COPY into target database.
- *
- * @param data the origin binary data
- * @return the encoded string
- */
- protected String escapeBinary(byte[] data) {
- fieldStringBuilder.setLength(0);
-
- for (int i = 0; i < data.length; ++i) {
- if (data[i] == '\\') {
- fieldStringBuilder.append('\\');
- fieldStringBuilder.append('\\');
- } else if (data[i] < 0x20 || data[i] > 0x7e) {
- byte b = data[i];
- char[] val = new char[3];
- val[2] = (char) ((b & 07) + '0');
- b >>= 3;
- val[1] = (char) ((b & 07) + '0');
- b >>= 3;
- val[0] = (char) ((b & 03) + '0');
- fieldStringBuilder.append('\\');
- fieldStringBuilder.append(val);
- } else {
- fieldStringBuilder.append((char) (data[i]));
+ changeSet.destory();
+ changeSet = null;
+
+ return rowSet;
}
- }
- return fieldStringBuilder.toString();
- }
+ /**
+ * Format the change data into csv tuple, which is then loaded to database by COPY.
+ *
+ * @param record contain change data polled by {@code Provider}.
+ * @param table metadata of the target table
+ * @param columns the indexes of columns to assemble a csv tuple
+ * @param oldValue only for update operation when primary key was updated, we need to get the
+ * old key and delete the old tuple
+ * @return the csv tuple in string
+ * @throws BiremeException when can not get the field value
+ */
+ protected String formatColumns(Record record, Table table, ArrayList columns,
+ boolean oldValue) throws BiremeException {
+ tupleStringBuilder.setLength(0);
+
+ for (int i = 0; i < columns.size(); ++i) {
+ String columnName = columns.get(i);
+ int sqlType = table.columnType.get(columnName);
+ String data = null;
+
+ data = record.getField(columnName, oldValue);
+ if (data != null) {
+ switch (sqlType) {
+ case Types.CHAR:
+ case Types.NCHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NVARCHAR:
+ case Types.LONGNVARCHAR: {
+ tupleStringBuilder.append(QUOTE);
+ tupleStringBuilder.append(escapeString(data));
+ tupleStringBuilder.append(QUOTE);
+
+ break;
+ }
+
+ case Types.BINARY:
+ case Types.BLOB:
+ case Types.CLOB:
+ case Types.LONGVARBINARY:
+ case Types.NCLOB:
+ case Types.VARBINARY: {
+ byte[] decoded = null;
+ decoded = decodeToBinary(data);
+ tupleStringBuilder.append(escapeBinary(decoded));
+ break;
+ }
+
+ case Types.BIT: {
+ int precision = table.columnPrecision.get(columnName);
+ tupleStringBuilder.append(decodeToBit(data, precision));
+ break;
+ }
+
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP: {
+ int scale = table.columnScale.get(columnName);
+ String time = decodeToTime(data, sqlType, scale);
+ tupleStringBuilder.append(time);
+ break;
+ }
+
+ case Types.DECIMAL:
+ case Types.NUMERIC: {
+ int scale = table.columnScale.get(columnName);
+ String numeric = decodeToNumeric(data, sqlType, scale);
+ tupleStringBuilder.append(numeric);
+ break;
+ }
+
+ default: {
+ tupleStringBuilder.append(data);
+ break;
+ }
+ }
+ }
+ if (i + 1 < columns.size()) {
+ tupleStringBuilder.append(FIELD_DELIMITER);
+ }
+ }
+ tupleStringBuilder.append(NEWLINE);
- /**
- * Appoint a {@code ChangeSet} to the {@code Transformer}
- *
- * @param changeSet a package of change data
- */
- public void setChangeSet(ChangeSet changeSet) {
- this.changeSet = changeSet;
- }
+ return tupleStringBuilder.toString();
+ }
- /**
- * Write the change data into a {@code RowSet}.
- *
- * @param rowSet a empty {@code RowSet} to store change data
- * @throws BiremeException Exceptions when fill the {@code RowSet}
- */
- public abstract void fillRowSet(RowSet rowSet) throws BiremeException;
+ /**
+ * For binary type, {@code Transformer} need to decode the extracted string and transform it to
+ * origin binary.
+ *
+ * @param data the encoded string
+ * @return the array of byte, decode result
+ */
+ protected abstract byte[] decodeToBinary(String data);
+
+ /**
+ * For bit type, {@code Transformer} need to decode the extracted string and transform it to
+ * origin bit.
+ *
+ * @param data the encoded string
+ * @param precision the length of the bit field, acquired from the table's metadata
+ * @return the string of 1 or 0
+ */
+ protected abstract String decodeToBit(String data, int precision);
+
+ /**
+ * For Date/Time type, {@code Transformer} need to decode the extracted string and transform it
+ * to origin Date/Time string.
+ *
+ * @param data the encoded string from provider
+ * @param sqlType particular type of this field, such as Time, Date
+ * @param precision specifies the number of fractional digits retained in the seconds field
+ * @return the Date/Time format
+ */
+ protected String decodeToTime(String data, int sqlType, int precision) {
+ return data;
+ }
- /**
- * After convert a single change data to a {@code Row}, insert into the {@code RowSet}.
- *
- * @param row the converted change data
- * @param rowSet the {@code RowSet} to organize the {@code Row}
- */
- public void addToRowSet(Row row, RowSet rowSet) {
- HashMap> bucket = rowSet.rowBucket;
- String mappedTable = row.mappedTable;
- ArrayList array = bucket.get(mappedTable);
+ ;
+
+ /**
+ * For Numeric type, {@code Transformer} need to decode the extracted string and transform it to
+ * origin Numeric in String.
+ *
+ * @param data the value from provider
+ * @param sqlType particular type of this field
+ * @param precision the count of decimal digits in the fractional part
+ * @return the numeric number in String
+ */
+ protected String decodeToNumeric(String data, int sqlType, int precision) {
+ return data;
+ };
+
+ /**
+ * Add escape character to a data string.
+ *
+ * @param data the origin string
+ * @return the modified string
+ */
+ protected String escapeString(String data) {
+ fieldStringBuilder.setLength(0);
+
+ for (int i = 0; i < data.length(); ++i) {
+ char c = data.charAt(i);
+
+ switch (c) {
+ case 0x00:
+ logger.warn("illegal character 0x00, deleted.");
+ continue;
+ case QUOTE:
+ case ESCAPE:
+ fieldStringBuilder.append(ESCAPE);
+ }
+
+ fieldStringBuilder.append(c);
+ }
- if (array == null) {
- array = new ArrayList();
- bucket.put(mappedTable, array);
- }
+ return fieldStringBuilder.toString();
+ }
- array.add(row);
+ /**
+ * Encode the binary data into string for COPY into target database.
+ *
+ * @param data the origin binary data
+ * @return the encoded string
+ */
+ protected String escapeBinary(byte[] data) {
+ fieldStringBuilder.setLength(0);
+
+ for (int i = 0; i < data.length; ++i) {
+ if (data[i] == '\\') {
+ fieldStringBuilder.append('\\');
+ fieldStringBuilder.append('\\');
+ } else if (data[i] < 0x20 || data[i] > 0x7e) {
+ byte b = data[i];
+ char[] val = new char[3];
+ val[2] = (char) ((b & 07) + '0');
+ b >>= 3;
+ val[1] = (char) ((b & 07) + '0');
+ b >>= 3;
+ val[0] = (char) ((b & 03) + '0');
+ fieldStringBuilder.append('\\');
+ fieldStringBuilder.append(val);
+ } else {
+ fieldStringBuilder.append((char) (data[i]));
+ }
+ }
+
+ return fieldStringBuilder.toString();
+ }
+
+ /**
+ * Appoint a {@code ChangeSet} to the {@code Transformer}
+ *
+ * @param changeSet a package of change data
+ */
+ public void setChangeSet(ChangeSet changeSet) {
+ this.changeSet = changeSet;
+ }
+
+ /**
+ * Write the change data into a {@code RowSet}.
+ *
+ * @param rowSet a empty {@code RowSet} to store change data
+ * @throws BiremeException Exceptions when fill the {@code RowSet}
+ */
+ public abstract void fillRowSet(RowSet rowSet) throws BiremeException;
+
+ /**
+ * After convert a single change data to a {@code Row}, insert into the {@code RowSet}.
+ *
+ * @param row the converted change data
+ * @param rowSet the {@code RowSet} to organize the {@code Row}
+ */
+ public void addToRowSet(Row row, RowSet rowSet) {
+ HashMap> bucket = rowSet.rowBucket;
+ String mappedTable = row.mappedTable;
+ ArrayList array = bucket.get(mappedTable);
+ if (array == null) {
+ array = new ArrayList();
+ bucket.put(mappedTable, array);
+ }
+ array.add(row);
+ }
}
- }
+
}
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index cb06886..ef0ec9f 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -7,13 +7,53 @@
"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+