File tree Expand file tree Collapse file tree 1 file changed +8
-1
lines changed Expand file tree Collapse file tree 1 file changed +8
-1
lines changed Original file line number Diff line number Diff line change @@ -245,12 +245,15 @@ Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> p
245245
246246 RdKafka::Error* error = consumer->incremental_unassign (partitions);
247247
248+ std::vector<RdKafka::TopicPartition*> delete_partitions;
249+
248250 if (error == NULL ) {
249251 // For now, use two for loops. Make more efficient if needed at a later point.
250252 for (unsigned int i = 0 ; i < partitions.size (); i++) {
251253 for (unsigned int j = 0 ; j < m_partitions.size (); j++) {
252254 if (partitions[i]->partition () == m_partitions[j]->partition () &&
253255 partitions[i]->topic () == m_partitions[j]->topic ()) {
256+ delete_partitions.push_back (m_partitions[j]);
254257 m_partitions.erase (m_partitions.begin () + j);
255258 m_partition_cnt--;
256259 break ;
@@ -260,7 +263,11 @@ Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> p
260263 }
261264
262265 // Destroy the old list of partitions since we are no longer using it
263- RdKafka::TopicPartition::destroy (m_partitions);
266+ RdKafka::TopicPartition::destroy (delete_partitions);
267+
268+ // Destroy the partition args since those are only used to lookup the partitions
269+ // that needed to be deleted.
270+ RdKafka::TopicPartition::destroy (partitions);
264271
265272 return rdkafkaErrorToBaton (error);
266273}
You can’t perform that action at this time.
0 commit comments