Skip to content

NoSuchFieldException when using consumer inherited from KafkaConsumer #195

@micr0farad

Description

@micr0farad

Issue description

Hey!
I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.

class ExceptionHandlingKafkaConsumer<K, V> extends KafkaConsumer<K, V> { 
    public ExceptionHandlingKafkaConsumer(final Map<String, Object> configs) {
        super(configs);
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        try {
            return super.poll(timeout);
        }
        catch (Exception e) {
            if(!deserializationError(e)) {
                throw e;
            }
            this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
            return ConsumerRecords.empty();
        }
}

But then I get this error on startup:

java.lang.NoSuchFieldException: coordinator
	at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:343)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:199)
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:34)
	at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:28)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig.mediaParallelConsumer(MediaEventsReceiverConfig.java:69)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.CGLIB$mediaParallelConsumer$1(<generated>)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec$$FastClassBySpringCGLIB$$ff4101fd.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.mediaParallelConsumer(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 49 common frames omitted

Possible solutions

Workaround

Use composition over inheritance. So AbstractParallelEoSStreamProcessor will skip this check.

Proper fix

Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use #getSuperclass() on it.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions