Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions api/v1/search/mongodbsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import (
)

const (
MongotDefaultPort = 27027
MongotDefaultWireprotoPort = 27027
MongotDefaultGrpcPort = 27028
MongotDefaultMetricsPort = 9946
MongotDefautHealthCheckPort = 8080
MongotDefaultSyncSourceUsername = "search-sync-source"

ForceWireprotoAnnotation = "mongodb.com/v1.force-search-wireproto"
)

func init() {
Expand Down Expand Up @@ -207,8 +210,12 @@ func (s *MongoDBSearch) GetMongoDBResourceRef() *userv1.MongoDBResourceRef {
return &mdbResourceRef
}

func (s *MongoDBSearch) GetMongotPort() int32 {
return MongotDefaultPort
func (s *MongoDBSearch) GetMongotWireprotoPort() int32 {
return MongotDefaultWireprotoPort
}

func (s *MongoDBSearch) GetMongotGrpcPort() int32 {
return MongotDefaultGrpcPort
}

func (s *MongoDBSearch) GetMongotMetricsPort() int32 {
Expand Down Expand Up @@ -241,3 +248,18 @@ func (s *MongoDBSearch) GetLogLevel() mdb.LogLevel {

return s.Spec.LogLevel
}

// mongot configuration defaults to the gRPC server. on rare occasions we might advise users to enable the legacy
// wireproto server. Once the deprecated wireproto server is removed, this function, annotation, and all code guarded
// by this check should be removed.
func (s *MongoDBSearch) IsWireprotoEnabled() bool {
val, ok := s.Annotations[ForceWireprotoAnnotation]
return ok && val == "true"
}

func (s *MongoDBSearch) GetEffectiveMongotPort() int32 {
if s.IsWireprotoEnabled() {
return s.GetMongotWireprotoPort()
}
return s.GetMongotGrpcPort()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
kind: feature
date: 2025-10-30
---

* **MongoDBSearch**: Switch to gRPC and mTLS for internal communication
Since MCK 1.4 the `mongod` and `mongot` processess communicated using the MongoDB Wire Protocol and used keyfile authentication. This release switches that to gRPC with mTLS authentication. gRPC will allow for load-balancing search queries against multiple `mongot` processes in the future, and mTLS decouples the internal cluster authentication mode and credentials among `mongod` processes from the connection to the `mongot` process. The Operator will automatically enable gRPC for existing and new workloads, and will enable mTLS authentication if both Database Server and `MongoDBSearch` resource are configured for TLS.
Comment on lines +1 to +7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT!

2 changes: 1 addition & 1 deletion controllers/om/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func (d Deployment) GetAllProcessNames() (names []string) {
for _, p := range d.getProcesses() {
names = append(names, p.Name())
}
return
return names
}

func (d Deployment) getProcesses() []Process {
Expand Down
6 changes: 5 additions & 1 deletion controllers/operator/mongodbsearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
}

r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
if mdbSearch.IsWireprotoEnabled() {
log.Info("Enabling the mongot wireproto server as required by annotation")
// the keyfile secret is necessary for wireproto authentication
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
}

// Watch for changes in database source CA certificate secrets or configmaps
tlsSourceConfig := searchSource.TLSConfig()
Expand Down
108 changes: 75 additions & 33 deletions controllers/operator/mongodbsearch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operator
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/ghodss/yaml"
Expand Down Expand Up @@ -100,6 +101,19 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
if search.Spec.LogLevel != "" {
logLevel = string(search.Spec.LogLevel)
}

var wireprotoServer *mongot.ConfigWireproto
if search.IsWireprotoEnabled() {
wireprotoServer = &mongot.ConfigWireproto{
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()),
Authentication: &mongot.ConfigAuthentication{
Mode: "keyfile",
KeyFile: searchcontroller.TempKeyfilePath,
},
TLS: &mongot.ConfigWireprotoTLS{Mode: mongot.ConfigTLSModeDisabled},
}
}

return mongot.Config{
SyncSource: mongot.ConfigSyncSource{
ReplicaSet: mongot.ConfigReplicaSet{
Expand All @@ -115,14 +129,11 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
DataPath: searchcontroller.MongotDataPath,
},
Server: mongot.ConfigServer{
Wireproto: &mongot.ConfigWireproto{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still keep a wireproto test as long as it is possible to enabled it?

Address: "0.0.0.0:27027",
Authentication: &mongot.ConfigAuthentication{
Mode: "keyfile",
KeyFile: searchcontroller.TempKeyfilePath,
},
TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled},
Grpc: &mongot.ConfigGrpc{
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled},
},
Wireproto: wireprotoServer,
},
Metrics: mongot.ConfigMetrics{
Enabled: true,
Expand Down Expand Up @@ -168,35 +179,66 @@ func TestMongoDBSearchReconcile_MissingSource(t *testing.T) {

func TestMongoDBSearchReconcile_Success(t *testing.T) {
ctx := context.Background()
search := newMongoDBSearch("search", mock.TestNamespace, "mdb")
search.Spec.LogLevel = "WARN"

mdbc := newMongoDBCommunity("mdb", mock.TestNamespace)
reconciler, c := newSearchReconciler(mdbc, search)

res, err := reconciler.Reconcile(
ctx,
reconcile.Request{NamespacedName: types.NamespacedName{Name: search.Name, Namespace: search.Namespace}},
)
expected, _ := workflow.OK().ReconcileResult()
assert.NoError(t, err)
assert.Equal(t, expected, res)

svc := &corev1.Service{}
err = c.Get(ctx, search.SearchServiceNamespacedName(), svc)
assert.NoError(t, err)
tests := []struct {
name string
withWireproto bool
}{
{
name: "grpc only (default)",
withWireproto: false,
},
{
name: "grpc + wireproto via annotation",
withWireproto: true,
},
}

cm := &corev1.ConfigMap{}
err = c.Get(ctx, search.MongotConfigConfigMapNamespacedName(), cm)
assert.NoError(t, err)
expectedConfig := buildExpectedMongotConfig(search, mdbc)
configYaml, err := yaml.Marshal(expectedConfig)
assert.NoError(t, err)
assert.Equal(t, string(configYaml), cm.Data[searchcontroller.MongotConfigFilename])
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
search := newMongoDBSearch("search", mock.TestNamespace, "mdb")
search.Spec.LogLevel = "WARN"
search.Annotations = map[string]string{
searchv1.ForceWireprotoAnnotation: strconv.FormatBool(tc.withWireproto),
}

sts := &appsv1.StatefulSet{}
err = c.Get(ctx, search.StatefulSetNamespacedName(), sts)
assert.NoError(t, err)
mdbc := newMongoDBCommunity("mdb", mock.TestNamespace)
reconciler, c := newSearchReconciler(mdbc, search)

res, err := reconciler.Reconcile(
ctx,
reconcile.Request{NamespacedName: types.NamespacedName{Name: search.Name, Namespace: search.Namespace}},
)
expected, _ := workflow.OK().ReconcileResult()
assert.NoError(t, err)
assert.Equal(t, expected, res)

svc := &corev1.Service{}
err = c.Get(ctx, search.SearchServiceNamespacedName(), svc)
assert.NoError(t, err)
servicePortNames := []string{}
for _, port := range svc.Spec.Ports {
servicePortNames = append(servicePortNames, port.Name)
}
expectedPortNames := []string{"mongot-grpc", "metrics", "healthcheck"}
if tc.withWireproto {
expectedPortNames = append(expectedPortNames, "mongot-wireproto")
}
assert.ElementsMatch(t, expectedPortNames, servicePortNames)

cm := &corev1.ConfigMap{}
err = c.Get(ctx, search.MongotConfigConfigMapNamespacedName(), cm)
assert.NoError(t, err)
expectedConfig := buildExpectedMongotConfig(search, mdbc)
configYaml, err := yaml.Marshal(expectedConfig)
assert.NoError(t, err)
assert.Equal(t, string(configYaml), cm.Data[searchcontroller.MongotConfigFilename])

sts := &appsv1.StatefulSet{}
err = c.Get(ctx, search.StatefulSetNamespacedName(), sts)
assert.NoError(t, err)
})
}
}

func checkSearchReconcileFailed(
Expand Down
Loading