Skip to content

Commit f847b8d

Browse files
SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (apache#1045)
1 parent 1772177 commit f847b8d

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,12 @@ private Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessor
503503
.put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
504504
sideInputsProcessorSerializedInstance.get()));
505505
} else {
506+
String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
507+
.orElseThrow(() -> new SamzaException(
508+
String.format("Could not find side inputs processor factory for store: %s", storeName)));
506509
sideInputStoresToProcessors.get(taskName)
507-
.put(storeName, Util.getObj(config.getSideInputsProcessorFactory(storeName).get(),
508-
SideInputsProcessorFactory.class).getSideInputsProcessor(config,
509-
taskInstanceMetrics.get(taskName).registry()));
510+
.put(storeName, Util.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class)
511+
.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()));
510512
}
511513
}
512514
});
@@ -517,8 +519,10 @@ private Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessor
517519
for (String storeName : sideInputSystemStreams.keySet()) {
518520

519521
// have to use the right serde because the sideInput stores are created
520-
Serde keySerde = serdes.get(config.getStorageKeySerde(storeName).get());
521-
Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName).get());
522+
Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
523+
.orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
524+
Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
525+
.orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
522526
sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
523527
@Override
524528
public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {

0 commit comments

Comments
 (0)