Skip to content

Conversation

@PratRanj07
Copy link
Contributor

Implemented delete records api in name of deleteTopicRecords and wrote tests and example for it.

@PratRanj07 PratRanj07 requested review from a team as code owners November 4, 2024 08:06
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, thanks for this change.

  1. merge master
  2. address comments
  3. Please update CHANGELOG.md and migration guide as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete-topic-records.js

src/admin.cc Outdated

if (!info[0]->IsArray()) {
return Nan::ThrowError(
"Must provide array containg 'TopicPartition' objects");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicPartitionOffset objects, right?

src/admin.cc Outdated
}

int operation_timeout_ms =
GetParameter<int64_t>(options, "operation_timeout", 6000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be 60000 (60s) - make this change everywhere

if (!this._isConnected) {
throw new Error('Client is disconnected');
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 if (!options) {
    options = {};
  }

Add this.

I also noticed this is missing in the above ListConsumerGroupOffsets method, add that also.

src/admin.cc Outdated
Comment on lines 1288 to 1291
v8::Local<v8::Object> options = Nan::New<v8::Object>();
if (info.Length() > 2 && info[1]->IsObject()) {
options = info[1].As<v8::Object>();
}
Copy link
Contributor

@milindl milindl Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this. Just do

v8::Local<v8::Object> options = info[1].As<v8::Object>();

Same for ListConsumerGroupOffsets.

This will work with that change in admin.js which sets options to {} if it's unset.

const topicPartitionOffsets = partitions.map(({ partition, offset }) => ({
topic,
partition,
offset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offset is a string as per the type definition, you need to convert it.

So this needs to be in a loop, for example

for (const partition of partitions) {
  if (partition.offset === null || partition.offset === undefined) // throw error
  const offset = +partition.offset;
  if (isNaN(offset)) // throw error

  // add {topic, partition.partition, offset} to  topicPartitionOffsets
}

'operation-timeout': {
type: 'string',
short: 'o',
default: '6000',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60000

console.error("Partition and offset should be numbers and provided in pairs.");
process.exit(1);
}
partitions.push({ partition, offset });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset needs to actually be a string to comply with the API signature

await producer.send({ topic: topicName, messages: messages });

await expect(
admin.deleteTopicRecords({ topic: topicName, partitions: [{ partition: 0, offset: 4 }], timeout: 0 })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in all the other tests, the offset must be a string.

partitions: [{ partition: 0, offset: 5 }],
});

expect(records).toEqual([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to update the test after making error an optional field (omitted on success)

@PratRanj07 PratRanj07 requested a review from milindl November 7, 2024 18:55
Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo minor issues.

Address these two lint issues also:

src/common.cc:461:  At least two spaces is best between code and comments  [whitespace/comments] [2]
src/common.cc:1278:  Missing space before ( in if(  [whitespace/parens] [5]

MIGRATION.md Outdated
* The `fetchOffsets` method is supported with additional `timeout` and
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
* The `deleteTopicRecords` method is supported with additional `timeout`
and `operationTiemout` option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and `operationTiemout` option.
and `operationTimeout` options.

src/admin.cc Outdated
v8::Local<v8::Array> delete_records_list = info[0].As<v8::Array>();

if (delete_records_list->Length() == 0) {
return Nan::ThrowError("Must provide at least one TopicPartition");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Nan::ThrowError("Must provide at least one TopicPartition");
return Nan::ThrowError("Must provide at least one TopicPartitionOffset");

Comment on lines 1 to 8
# confluent-kafka-javascript v0.5.0

v0.4.0 is a limited availability feature release. It is supported for all usage.

## Enhancements

1. Add support for an Admin API to delete records.(#141).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# confluent-kafka-javascript v0.5.0
v0.4.0 is a limited availability feature release. It is supported for all usage.
## Enhancements
1. Add support for an Admin API to delete records.(#141).
# confluent-kafka-javascript v0.5.0
v0.5.0 is a limited availability feature release. It is supported for all usage.
## Enhancements
1. Add support for an Admin API to delete records.(#141).

@PratRanj07 PratRanj07 requested a review from milindl November 12, 2024 06:28
@PratRanj07 PratRanj07 merged commit 3099517 into master Nov 12, 2024
1 of 2 checks passed
@PratRanj07 PratRanj07 deleted the feature/deleteRecords branch November 12, 2024 08:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants