diff --git a/vreplgen/README.md b/vreplgen/README.md new file mode 100644 index 0000000..eb800af --- /dev/null +++ b/vreplgen/README.md @@ -0,0 +1,43 @@ +A golang CLI utility to generate vtctlclient commands to add vreplication +rules: + +``` +Usage: vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] 'filter1' [ 'filter2']... +``` + +E.g.: + +``` +./vreplgen cell-0000000001 main -80 main_copy transactionhistory 'select * from transactionhistory where in_keyrange(merchant_id, "hash", "80-")' +``` + +The utility also supports multiple table filters, which allows multiple tables +to be specified in a single vreplication stream (good for if you have +a lot of tables you want to process via vreplication). E.g.: + +``` +./vreplgen cell-0000000001 main -80 main_copy transactionhistory 'select * from transactionhistory where in_keyrange(merchant_id, "hash", "80-")' transactionhistory2 'select * from transactionhistory2 where in_keyrange(merchant_id, "hash", "-80")' +``` + +An important thing to note is that a single vreplication stream cannot use +the same source table in the same stream. The utility will not prevent +you from doing this, however. + +`vreplgen` assumes you are running vtctld on localhost port 15999. If not, +you can set your VTCTLCLIENT environment variable to the vtctlclient command +you want `vreplgen` to generate, e.g.: + +``` +export VTCTLCLIENT="vtctlclient -server vtctld:15999" +``` + +Lastly, you can control the on_ddl flag using vreplgen. The default if you +do not specify the `-on_ddl` option is `ignore`, but you can specify: + + * `-on_ddl ignore` + * `-on_ddl stop` + * `-on_ddl exec` + * `-on_ddl exec_ignore` + +depending on how you want your DDL to be handled in your replication streams. +See the main vreplication documentation for more details. diff --git a/vreplgen/vreplgen.go b/vreplgen/vreplgen.go index de7f2ad..570f1db 100644 --- a/vreplgen/vreplgen.go +++ b/vreplgen/vreplgen.go @@ -23,35 +23,71 @@ package main import ( "bytes" + "flag" "fmt" + "os" "strings" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +var onDDL string + +func init() { + flag.StringVar(&onDDL, "on_ddl", "ignore", "Set on_ddl value for replication stream - ignore, stop, exec, exec_ignore") +} + func main() { - vtctl := "./lvtctl.sh" - tabletID := "test-400" - dbName := "vt_merchant" + flag.Parse() + + if len(os.Args) < 9 { + fmt.Println("Usage: vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] 'filter1' [ 'filter2']...") + os.Exit(1) + } + + vtctl := os.Getenv("VTCTLCLIENT") + if vtctl == "" { + vtctl = "vtctlclient -server localhost:15999" + } + + // First, we process fixed positional arguments + // such as the intended target and source + tabletID := os.Args[3] + sourceKeyspace := os.Args[4] + sourceShard := os.Args[5] + destKeyspace := os.Args[6] + destDbName := destKeyspace + var rules []*binlogdatapb.Rule + + // Next, we iterate over all possible rules + // Note this can be a variable number! + for i := 7; i < len(os.Args); i = i + 2 { + destTable := os.Args[i] + destFilter := os.Args[i+1] + rule := new(binlogdatapb.Rule) + rule.Match = destTable + rule.Filter = destFilter + rules = append(rules, rule) + } filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "morder", - Filter: "select * from uorder where in_keyrange(mname, 'unicode_loose_md5', '-80')", - }}, + Rules: rules, } + + onDDLAction := binlogdatapb.OnDDLAction(binlogdatapb.OnDDLAction_value[strings.ToUpper(onDDL)]) + bls := &binlogdatapb.BinlogSource{ - Keyspace: "user", - Shard: "-80", + Keyspace: sourceKeyspace, + Shard: sourceShard, Filter: filter, - OnDdl: binlogdatapb.OnDDLAction_IGNORE, + OnDdl: onDDLAction, } val := sqltypes.NewVarBinary(fmt.Sprintf("%v", bls)) var sqlEscaped bytes.Buffer val.EncodeSQL(&sqlEscaped) query := fmt.Sprintf("insert into _vt.vreplication "+ "(db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+ - "('%s', %s, '', 9999, 9999, 'master', 0, 0, 'Running')", dbName, sqlEscaped.String()) + "('%s', %s, '', 9999, 9999, 'master', 0, 0, 'Running')", destDbName, sqlEscaped.String()) fmt.Printf("%s VReplicationExec %s '%s'\n", vtctl, tabletID, strings.Replace(query, "'", "'\"'\"'", -1)) }