-
Notifications
You must be signed in to change notification settings - Fork 157
Closed
Milestone
Description
Issue description
Hey!
I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.
class ExceptionHandlingKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
public ExceptionHandlingKafkaConsumer(final Map<String, Object> configs) {
super(configs);
}
@Override
public ConsumerRecords<K, V> poll(Duration timeout) {
try {
return super.poll(timeout);
}
catch (Exception e) {
if(!deserializationError(e)) {
throw e;
}
this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
return ConsumerRecords.empty();
}
}
But then I get this error on startup:
java.lang.NoSuchFieldException: coordinator
at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:343)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:199)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:34)
at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:28)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig.mediaParallelConsumer(MediaEventsReceiverConfig.java:69)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.CGLIB$mediaParallelConsumer$1(<generated>)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec$$FastClassBySpringCGLIB$$ff4101fd.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.mediaParallelConsumer(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 49 common frames omitted
Possible solutions
Workaround
Use composition over inheritance. So AbstractParallelEoSStreamProcessor
will skip this check.
Proper fix
Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use #getSuperclass()
on it.
astubbs
Metadata
Metadata
Assignees
Labels
No labels