Skip to content

Commit ecb4ae1

Browse files
vitess-bot[bot]mhamza15
authored andcommitted
[release-21.0] VReplication: Support reversing read-only traffic in vtctldclient (vitessio#16920) (vitessio#17015)
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
1 parent b92a072 commit ecb4ae1

File tree

9 files changed

+1102
-101
lines changed

9 files changed

+1102
-101
lines changed

go/test/endtoend/vreplication/helper_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro
339339

340340
func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
341341
t.Helper()
342+
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
343+
require.NoError(t, err)
342344
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
343-
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
345+
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
346+
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
344347
}
345348

346349
func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {

go/test/endtoend/vreplication/movetables_buffering_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@ import (
1212
)
1313

1414
func TestMoveTablesBuffering(t *testing.T) {
15-
defaultRdonly = 1
15+
ogReplicas := defaultReplicas
16+
ogRdOnly := defaultRdonly
17+
defer func() {
18+
defaultReplicas = ogReplicas
19+
defaultRdonly = ogRdOnly
20+
}()
21+
defaultRdonly = 0
22+
defaultReplicas = 0
1623
vc = setupMinimalCluster(t)
1724
defer vc.TearDown()
1825

go/test/endtoend/vreplication/resharding_workflows_v2_test.go

Lines changed: 97 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/json"
2121
"fmt"
2222
"net"
23+
"slices"
2324
"strconv"
2425
"strings"
2526
"testing"
@@ -31,9 +32,11 @@ import (
3132

3233
"vitess.io/vitess/go/test/endtoend/cluster"
3334
"vitess.io/vitess/go/vt/log"
35+
"vitess.io/vitess/go/vt/topo/topoproto"
3436
"vitess.io/vitess/go/vt/wrangler"
3537

3638
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
39+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3740
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
3841
)
3942

@@ -163,9 +166,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
163166
args = append(args, "--tablet-types", tabletTypes)
164167
}
165168
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
166-
if debugMode {
167-
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
168-
}
169+
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
169170
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
170171
lastOutput = output
171172
if err != nil {
@@ -326,27 +327,45 @@ func tstWorkflowCancel(t *testing.T) error {
326327
return tstWorkflowAction(t, workflowActionCancel, "", "")
327328
}
328329

329-
func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
330-
if tabletTypes == "" {
331-
tabletTypes = "replica,rdonly"
330+
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
331+
if tablet == nil {
332+
return
332333
}
333334
vtgateConn, closeConn := getVTGateConn()
334335
defer closeConn()
335-
for _, tt := range []string{"replica", "rdonly"} {
336-
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
337-
if strings.Contains(tabletTypes, tt) {
338-
readQuery := "select * from customer"
339-
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery)
340-
}
341-
}
336+
// We do NOT want to target a shard as that goes around the routing rules and
337+
// defeats the purpose here. We are using a query w/o a WHERE clause so for
338+
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
339+
// care about is the keyspace and tablet type.
340+
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
341+
readQuery := "select cid from customer limit 50"
342+
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
342343
}
343344

344345
func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
345-
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
346+
tt, err := topoproto.ParseTabletTypes(tabletTypes)
347+
require.NoError(t, err)
348+
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
349+
require.NotNil(t, sourceReplicaTab)
350+
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
351+
}
352+
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
353+
require.NotNil(t, sourceRdonlyTab)
354+
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
355+
}
346356
}
347357

348358
func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
349-
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
359+
tt, err := topoproto.ParseTabletTypes(tabletTypes)
360+
require.NoError(t, err)
361+
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
362+
require.NotNil(t, targetReplicaTab1)
363+
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
364+
}
365+
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
366+
require.NotNil(t, targetRdonlyTab1)
367+
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
368+
}
350369
}
351370

352371
func validateWritesRouteToSource(t *testing.T) {
@@ -396,6 +415,13 @@ func getCurrentStatus(t *testing.T) string {
396415
// but CI currently fails on creating multiple clusters even after the previous ones are torn down
397416

398417
func TestBasicV2Workflows(t *testing.T) {
418+
ogReplicas := defaultReplicas
419+
ogRdOnly := defaultRdonly
420+
defer func() {
421+
defaultReplicas = ogReplicas
422+
defaultRdonly = ogRdOnly
423+
}()
424+
defaultReplicas = 1
399425
defaultRdonly = 1
400426
extraVTTabletArgs = []string{
401427
parallelInsertWorkers,
@@ -633,6 +659,12 @@ func testPartialSwitches(t *testing.T) {
633659
tstWorkflowSwitchReads(t, "", "")
634660
checkStates(t, nextState, nextState) // idempotency
635661

662+
tstWorkflowReverseReads(t, "replica,rdonly", "")
663+
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
664+
665+
tstWorkflowSwitchReads(t, "", "")
666+
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
667+
636668
tstWorkflowSwitchWrites(t)
637669
currentState = nextState
638670
nextState = wrangler.WorkflowStateAllSwitched
@@ -669,12 +701,12 @@ func testRestOfWorkflow(t *testing.T) {
669701
waitForLowLag(t, "customer", "wf1")
670702
tstWorkflowSwitchReads(t, "", "")
671703
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
672-
validateReadsRouteToTarget(t, "replica")
704+
validateReadsRouteToTarget(t, "replica,rdonly")
673705
validateWritesRouteToSource(t)
674706

675707
tstWorkflowSwitchWrites(t)
676708
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
677-
validateReadsRouteToTarget(t, "replica")
709+
validateReadsRouteToTarget(t, "replica,rdonly")
678710
validateWritesRouteToTarget(t)
679711

680712
// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
@@ -685,42 +717,45 @@ func testRestOfWorkflow(t *testing.T) {
685717
waitForLowLag(t, keyspace, "wf1_reverse")
686718
tstWorkflowReverseReads(t, "", "")
687719
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
688-
validateReadsRouteToSource(t, "replica")
720+
validateReadsRouteToSource(t, "replica,rdonly")
689721
validateWritesRouteToTarget(t)
690722

691723
tstWorkflowReverseWrites(t)
692724
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
693-
validateReadsRouteToSource(t, "replica")
725+
validateReadsRouteToSource(t, "replica,rdonly")
694726
validateWritesRouteToSource(t)
695727

696728
waitForLowLag(t, "customer", "wf1")
697729
tstWorkflowSwitchWrites(t)
698730
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
699-
validateReadsRouteToSource(t, "replica")
731+
validateReadsRouteToSource(t, "replica,rdonly")
700732
validateWritesRouteToTarget(t)
701733

702734
waitForLowLag(t, keyspace, "wf1_reverse")
703735
tstWorkflowReverseWrites(t)
704-
validateReadsRouteToSource(t, "replica")
736+
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
737+
validateReadsRouteToSource(t, "replica,rdonly")
705738
validateWritesRouteToSource(t)
706739

707740
waitForLowLag(t, "customer", "wf1")
708741
tstWorkflowSwitchReads(t, "", "")
709-
validateReadsRouteToTarget(t, "replica")
742+
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
743+
validateReadsRouteToTarget(t, "replica,rdonly")
710744
validateWritesRouteToSource(t)
711745

712746
tstWorkflowReverseReads(t, "", "")
713-
validateReadsRouteToSource(t, "replica")
747+
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
748+
validateReadsRouteToSource(t, "replica,rdonly")
714749
validateWritesRouteToSource(t)
715750

716751
tstWorkflowSwitchReadsAndWrites(t)
717-
validateReadsRouteToTarget(t, "replica")
718-
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
752+
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
753+
validateReadsRouteToTarget(t, "replica,rdonly")
719754
validateWritesRouteToTarget(t)
720755
waitForLowLag(t, keyspace, "wf1_reverse")
721756
tstWorkflowReverseReadsAndWrites(t)
722-
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
723-
validateReadsRouteToSource(t, "replica")
757+
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
758+
validateReadsRouteToSource(t, "replica,rdonly")
724759
validateWritesRouteToSource(t)
725760

726761
// trying to complete an unswitched workflow should error
@@ -731,8 +766,7 @@ func testRestOfWorkflow(t *testing.T) {
731766
// fully switch and complete
732767
waitForLowLag(t, "customer", "wf1")
733768
tstWorkflowSwitchReadsAndWrites(t)
734-
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
735-
validateReadsRouteToTarget(t, "replica")
769+
validateReadsRouteToTarget(t, "replica,rdonly")
736770
validateWritesRouteToTarget(t)
737771

738772
err = tstWorkflowComplete(t)
@@ -787,7 +821,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
787821

788822
zone1 := vc.Cells["zone1"]
789823

790-
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
824+
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
791825

792826
verifyClusterHealth(t, vc)
793827
insertInitialData(t)
@@ -800,7 +834,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
800834
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
801835
tablets := make(map[string]*cluster.VttabletProcess)
802836
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
803-
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
837+
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
804838
t.Fatal(err)
805839
}
806840
defaultCell := vc.Cells[vc.CellNames[0]]
@@ -936,6 +970,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
936970
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
937971
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
938972
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
973+
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet
939974

940975
sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
941976
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
@@ -947,3 +982,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
947982
"--sql", sql, keyspace)
948983
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
949984
}
985+
986+
func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
987+
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
988+
rr := getRoutingRules(t)
989+
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
990+
// a workflow, that the routing rules are empty when the workflow starts.
991+
// We set it to false below when the rule is found, but before matching the routed keyspace.
992+
matched := true
993+
for _, r := range rr.GetRules() {
994+
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
995+
if tabletType != "" && tabletType != "primary" {
996+
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
997+
}
998+
if r.FromTable == fromRule {
999+
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
1000+
matched = false
1001+
require.NotEmpty(t, r.ToTables)
1002+
toTable := r.ToTables[0]
1003+
// The ToTables value is of the form "routedKeyspace.table".
1004+
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
1005+
require.True(t, ok)
1006+
require.Equal(t, table, routedTable)
1007+
if routedKeyspace == toKeyspace {
1008+
// We found the rule, the table and keyspace matches, so our search is done.
1009+
matched = true
1010+
break
1011+
}
1012+
}
1013+
}
1014+
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
1015+
}

0 commit comments

Comments
 (0)