Skip to content

Commit 3ec055e

Browse files
committed
[Search] Implement gRPC and mTLS
1 parent 87b203f commit 3ec055e

16 files changed

+170
-188
lines changed

api/v1/search/mongodbsearch_types.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
)
1818

1919
const (
20-
MongotDefaultPort = 27027
20+
MongotDefaultWireprotoPort = 27027
21+
MongotDefaultGrpcPort = 27028
2122
MongotDefaultMetricsPort = 9946
2223
MongotDefautHealthCheckPort = 8080
2324
MongotDefaultSyncSourceUsername = "search-sync-source"
25+
26+
ForceWireprotoTransportAnnotation = "mongodb.com/v1.force-wireproto-transport"
2427
)
2528

2629
func init() {
@@ -207,8 +210,12 @@ func (s *MongoDBSearch) GetMongoDBResourceRef() *userv1.MongoDBResourceRef {
207210
return &mdbResourceRef
208211
}
209212

210-
func (s *MongoDBSearch) GetMongotPort() int32 {
211-
return MongotDefaultPort
213+
func (s *MongoDBSearch) GetMongotWireprotoPort() int32 {
214+
return MongotDefaultWireprotoPort
215+
}
216+
217+
func (s *MongoDBSearch) GetMongotGrpcPort() int32 {
218+
return MongotDefaultGrpcPort
212219
}
213220

214221
func (s *MongoDBSearch) GetMongotMetricsPort() int32 {
@@ -241,3 +248,8 @@ func (s *MongoDBSearch) GetLogLevel() mdb.LogLevel {
241248

242249
return s.Spec.LogLevel
243250
}
251+
252+
func (s *MongoDBSearch) IsWireprotoForced() bool {
253+
val, ok := s.Annotations[ForceWireprotoTransportAnnotation]
254+
return ok && val == "true"
255+
}

api/v1/search/zz_generated.deepcopy.go

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

controllers/om/deployment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,7 @@ func (d Deployment) GetAllProcessNames() (names []string) {
874874
for _, p := range d.getProcesses() {
875875
names = append(names, p.Name())
876876
}
877-
return
877+
return names
878878
}
879879

880880
func (d Deployment) getProcesses() []Process {

controllers/operator/mongodbsearch_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
5757
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
5858
}
5959

60-
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
60+
if mdbSearch.IsWireprotoForced() {
61+
log.Info("Enabling the mongot wireproto server as required by annotation")
62+
// the keyfile secret is necessary for wireproto authentication
63+
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
64+
}
6165

6266
// Watch for changes in database source CA certificate secrets or configmaps
6367
tlsSourceConfig := searchSource.TLSConfig()

controllers/operator/mongodbsearch_controller_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,9 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
115115
DataPath: searchcontroller.MongotDataPath,
116116
},
117117
Server: mongot.ConfigServer{
118-
Wireproto: &mongot.ConfigWireproto{
119-
Address: "0.0.0.0:27027",
120-
Authentication: &mongot.ConfigAuthentication{
121-
Mode: "keyfile",
122-
KeyFile: searchcontroller.TempKeyfilePath,
123-
},
124-
TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled},
118+
Grpc: &mongot.ConfigGrpc{
119+
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
120+
TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled},
125121
},
126122
},
127123
Metrics: mongot.ConfigMetrics{

controllers/searchcontroller/mongodbsearch_reconcile_helper.go

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,15 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
9797
return workflow.Failed(err)
9898
}
9999

100-
keyfileStsModification, err := r.ensureSourceKeyfile(ctx, log)
101-
if apierrors.IsNotFound(err) {
102-
return workflow.Pending("Waiting for keyfile secret to be created")
103-
} else if err != nil {
104-
return workflow.Failed(err)
100+
keyfileStsModification := statefulset.NOOP()
101+
if r.mdbSearch.IsWireprotoForced() {
102+
var err error
103+
keyfileStsModification, err = r.ensureSourceKeyfile(ctx, log)
104+
if apierrors.IsNotFound(err) {
105+
return workflow.Pending("Waiting for keyfile secret to be created")
106+
} else if err != nil {
107+
return workflow.Failed(err)
108+
}
105109
}
106110

107111
if err := r.ensureSearchService(ctx, r.mdbSearch); err != nil {
@@ -113,11 +117,9 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
113117
return workflow.Failed(err)
114118
}
115119

116-
egressTlsMongotModification, egressTlsStsModification, err := r.ensureEgressTlsConfig(ctx)
117-
if err != nil {
118-
return workflow.Failed(err)
119-
}
120+
egressTlsMongotModification, egressTlsStsModification := r.ensureEgressTlsConfig(ctx)
120121

122+
// the egress TLS modification needs to always be applied after the ingress one, because it toggles mTLS based on the mode set by the ingress modification
121123
configHash, err := r.ensureMongotConfig(ctx, log, createMongotConfig(r.mdbSearch, r.db), ingressTlsMongotModification, egressTlsMongotModification)
122124
if err != nil {
123125
return workflow.Failed(err)
@@ -140,6 +142,7 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
140142
return workflow.OK()
141143
}
142144

145+
// This is called only if the wireproto server is enabled, to set up they keyfile necessary for authentication.
143146
func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, log *zap.SugaredLogger) (statefulset.Modification, error) {
144147
keyfileSecretName := kube.ObjectKey(r.mdbSearch.GetNamespace(), r.db.KeyfileSecretName())
145148
keyfileSecret := &corev1.Secret{}
@@ -154,6 +157,7 @@ func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context,
154157
"keyfileHash": hashBytes(keyfileSecret.Data[MongotKeyfileFilename]),
155158
},
156159
)),
160+
CreateKeyfileModificationFunc(r.db.KeyfileSecretName()),
157161
), nil
158162
}
159163

@@ -229,10 +233,7 @@ func (r *MongoDBSearchReconcileHelper) ensureMongotConfig(ctx context.Context, l
229233

230234
func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) {
231235
if r.mdbSearch.Spec.Security.TLS == nil {
232-
mongotModification := func(config *mongot.Config) {
233-
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeDisabled
234-
}
235-
return mongotModification, statefulset.NOOP(), nil
236+
return mongot.NOOP(), statefulset.NOOP(), nil
236237
}
237238

238239
// TODO: validate that the certificate in the user-provided Secret in .spec.security.tls.certificateKeySecret is issued by the CA in the operator's CA Secret
@@ -244,8 +245,12 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex
244245

245246
mongotModification := func(config *mongot.Config) {
246247
certPath := tls.OperatorSecretMountPath + certFileName
247-
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS
248-
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath)
248+
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeTLS
249+
config.Server.Grpc.TLS.CertificateKeyFile = ptr.To(certPath)
250+
if config.Server.Wireproto != nil {
251+
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS
252+
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath)
253+
}
249254
}
250255

251256
tlsSecret := r.mdbSearch.TLSOperatorSecretNamespacedName()
@@ -261,46 +266,34 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex
261266
return mongotModification, statefulsetModification, nil
262267
}
263268

264-
func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) {
269+
func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification) {
265270
tlsSourceConfig := r.db.TLSConfig()
266271
if tlsSourceConfig == nil {
267-
return mongot.NOOP(), statefulset.NOOP(), nil
272+
return mongot.NOOP(), statefulset.NOOP()
268273
}
269274

270275
mongotModification := func(config *mongot.Config) {
271276
config.SyncSource.ReplicaSet.TLS = ptr.To(true)
277+
config.SyncSource.CertificateAuthorityFile = ptr.To(tls.CAMountPath + "/" + tlsSourceConfig.CAFileName)
278+
279+
// if the gRPC server is configured to accept TLS connections then toggle mTLS as well
280+
if config.Server.Grpc.TLS.Mode == mongot.ConfigTLSModeTLS {
281+
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeMTLS
282+
config.Server.Grpc.TLS.CertificateAuthorityFile = config.SyncSource.CertificateAuthorityFile
283+
}
272284
}
273285

274-
_, containerSecurityContext := podtemplatespec.WithDefaultSecurityContextsModifications()
275286
caVolume := tlsSourceConfig.CAVolume
276-
trustStoreVolume := statefulset.CreateVolumeFromEmptyDir("cacerts")
277287
statefulsetModification := statefulset.WithPodSpecTemplate(podtemplatespec.Apply(
278288
podtemplatespec.WithVolume(caVolume),
279-
podtemplatespec.WithVolume(trustStoreVolume),
280-
podtemplatespec.WithInitContainer("init-cacerts", container.Apply(
281-
container.WithImage(r.buildImageString()),
282-
containerSecurityContext,
283-
container.WithVolumeMounts([]corev1.VolumeMount{
284-
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)),
285-
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/java/trust-store", statefulset.WithReadOnly(false)),
286-
}),
287-
container.WithCommand([]string{"sh"}),
288-
container.WithArgs([]string{
289-
"-c",
290-
fmt.Sprintf(`
291-
cp /mongot-community/bin/jdk/lib/security/cacerts /java/trust-store/cacerts
292-
/mongot-community/bin/jdk/bin/keytool -keystore /java/trust-store/cacerts -storepass changeit -noprompt -trustcacerts -importcert -alias mongodcert -file %s/%s
293-
`, tls.CAMountPath, tlsSourceConfig.CAFileName),
294-
}),
295-
)),
296289
podtemplatespec.WithContainer(MongotContainerName, container.Apply(
297290
container.WithVolumeMounts([]corev1.VolumeMount{
298-
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/mongot-community/bin/jdk/lib/security/cacerts", statefulset.WithReadOnly(true), statefulset.WithSubPath("cacerts")),
291+
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)),
299292
}),
300293
)),
301294
))
302295

303-
return mongotModification, statefulsetModification, nil
296+
return mongotModification, statefulsetModification
304297
}
305298

306299
func hashBytes(bytes []byte) string {
@@ -325,10 +318,17 @@ func buildSearchHeadlessService(search *searchv1.MongoDBSearch) corev1.Service {
325318
SetOwnerReferences(search.GetOwnerReferences())
326319

327320
serviceBuilder.AddPort(&corev1.ServicePort{
328-
Name: "mongot",
321+
Name: "mongot-wireproto",
322+
Protocol: corev1.ProtocolTCP,
323+
Port: search.GetMongotWireprotoPort(),
324+
TargetPort: intstr.FromInt32(search.GetMongotWireprotoPort()),
325+
})
326+
327+
serviceBuilder.AddPort(&corev1.ServicePort{
328+
Name: "mongot-grpc",
329329
Protocol: corev1.ProtocolTCP,
330-
Port: search.GetMongotPort(),
331-
TargetPort: intstr.FromInt32(search.GetMongotPort()),
330+
Port: search.GetMongotGrpcPort(),
331+
TargetPort: intstr.FromInt32(search.GetMongotGrpcPort()),
332332
})
333333

334334
serviceBuilder.AddPort(&corev1.ServicePort{
@@ -366,13 +366,24 @@ func createMongotConfig(search *searchv1.MongoDBSearch, db SearchSourceDBResourc
366366
DataPath: MongotDataPath,
367367
}
368368
config.Server = mongot.ConfigServer{
369-
Wireproto: &mongot.ConfigWireproto{
370-
Address: "0.0.0.0:27027",
369+
Grpc: &mongot.ConfigGrpc{
370+
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
371+
TLS: &mongot.ConfigGrpcTLS{
372+
Mode: mongot.ConfigTLSModeDisabled,
373+
},
374+
},
375+
}
376+
if search.IsWireprotoForced() {
377+
config.Server.Wireproto = &mongot.ConfigWireproto{
378+
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()),
371379
Authentication: &mongot.ConfigAuthentication{
372380
Mode: "keyfile",
373381
KeyFile: TempKeyfilePath,
374382
},
375-
},
383+
TLS: &mongot.ConfigWireprotoTLS{
384+
Mode: mongot.ConfigTLSModeDisabled,
385+
},
386+
}
376387
}
377388
config.Metrics = mongot.ConfigMetrics{
378389
Enabled: true,
@@ -393,19 +404,27 @@ func GetMongodConfigParameters(search *searchv1.MongoDBSearch) map[string]any {
393404
if search.Spec.Security.TLS != nil {
394405
searchTLSMode = automationconfig.TLSModeRequired
395406
}
407+
408+
parameters := map[string]any{
409+
"mongotHost": mongotHostAndPort(search),
410+
"searchIndexManagementHostAndPort": mongotHostAndPort(search),
411+
"skipAuthenticationToSearchIndexManagementServer": false,
412+
"searchTLSMode": string(searchTLSMode),
413+
"useGrpcForSearch": !search.IsWireprotoForced(),
414+
}
415+
396416
return map[string]any{
397-
"setParameter": map[string]any{
398-
"mongotHost": mongotHostAndPort(search),
399-
"searchIndexManagementHostAndPort": mongotHostAndPort(search),
400-
"skipAuthenticationToSearchIndexManagementServer": false,
401-
"searchTLSMode": string(searchTLSMode),
402-
},
417+
"setParameter": parameters,
403418
}
404419
}
405420

406421
func mongotHostAndPort(search *searchv1.MongoDBSearch) string {
407422
svcName := search.SearchServiceNamespacedName()
408-
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, search.GetMongotPort())
423+
port := search.GetMongotGrpcPort()
424+
if search.IsWireprotoForced() {
425+
port = search.GetMongotWireprotoPort()
426+
}
427+
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, port)
409428
}
410429

411430
func (r *MongoDBSearchReconcileHelper) ValidateSingleMongoDBSearchForSearchSource(ctx context.Context) error {

0 commit comments

Comments
 (0)