- 
                Notifications
    You must be signed in to change notification settings 
- Fork 23
Delete Records Api implemented #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| 🎉 All Contributor License Agreements have been signed. Ready to merge.  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thanks for this change.
- merge master
- address comments
- Please update CHANGELOG.md and migration guide as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete-topic-records.js
        
          
                src/admin.cc
              
                Outdated
          
        
      |  | ||
| if (!info[0]->IsArray()) { | ||
| return Nan::ThrowError( | ||
| "Must provide array containg 'TopicPartition' objects"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicPartitionOffset objects, right?
        
          
                src/admin.cc
              
                Outdated
          
        
      | } | ||
|  | ||
| int operation_timeout_ms = | ||
| GetParameter<int64_t>(options, "operation_timeout", 6000); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be 60000 (60s) - make this change everywhere
| if (!this._isConnected) { | ||
| throw new Error('Client is disconnected'); | ||
| } | ||
|  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
 if (!options) {
    options = {};
  }Add this.
I also noticed this is missing in the above ListConsumerGroupOffsets method, add that also.
        
          
                src/admin.cc
              
                Outdated
          
        
      | v8::Local<v8::Object> options = Nan::New<v8::Object>(); | ||
| if (info.Length() > 2 && info[1]->IsObject()) { | ||
| options = info[1].As<v8::Object>(); | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this. Just do
v8::Local<v8::Object> options = info[1].As<v8::Object>();
Same for ListConsumerGroupOffsets.
This will work with that change in admin.js which sets options to {} if it's unset.
        
          
                lib/kafkajs/_admin.js
              
                Outdated
          
        
      | const topicPartitionOffsets = partitions.map(({ partition, offset }) => ({ | ||
| topic, | ||
| partition, | ||
| offset, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offset is a string as per the type definition, you need to convert it.
So this needs to be in a loop, for example
for (const partition of partitions) {
  if (partition.offset === null || partition.offset === undefined) // throw error
  const offset = +partition.offset;
  if (isNaN(offset)) // throw error
  // add {topic, partition.partition, offset} to  topicPartitionOffsets
}
| 'operation-timeout': { | ||
| type: 'string', | ||
| short: 'o', | ||
| default: '6000', | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
60000
| console.error("Partition and offset should be numbers and provided in pairs."); | ||
| process.exit(1); | ||
| } | ||
| partitions.push({ partition, offset }); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset needs to actually be a string to comply with the API signature
| await producer.send({ topic: topicName, messages: messages }); | ||
|  | ||
| await expect( | ||
| admin.deleteTopicRecords({ topic: topicName, partitions: [{ partition: 0, offset: 4 }], timeout: 0 }) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in all the other tests, the offset must be a string.
| partitions: [{ partition: 0, offset: 5 }], | ||
| }); | ||
|  | ||
| expect(records).toEqual([ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember to update the test after making error an optional field (omitted on success)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo minor issues.
Address these two lint issues also:
src/common.cc:461:  At least two spaces is best between code and comments  [whitespace/comments] [2]
src/common.cc:1278:  Missing space before ( in if(  [whitespace/parens] [5]
        
          
                MIGRATION.md
              
                Outdated
          
        
      | * The `fetchOffsets` method is supported with additional `timeout` and | ||
| `requireStableOffsets` option but `resolveOffsets` option is not yet supported. | ||
| * The `deleteTopicRecords` method is supported with additional `timeout` | ||
| and `operationTiemout` option. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| and `operationTiemout` option. | |
| and `operationTimeout` options. | 
        
          
                src/admin.cc
              
                Outdated
          
        
      | v8::Local<v8::Array> delete_records_list = info[0].As<v8::Array>(); | ||
|  | ||
| if (delete_records_list->Length() == 0) { | ||
| return Nan::ThrowError("Must provide at least one TopicPartition"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return Nan::ThrowError("Must provide at least one TopicPartition"); | |
| return Nan::ThrowError("Must provide at least one TopicPartitionOffset"); | 
| # confluent-kafka-javascript v0.5.0 | ||
|  | ||
| v0.4.0 is a limited availability feature release. It is supported for all usage. | ||
|  | ||
| ## Enhancements | ||
|  | ||
| 1. Add support for an Admin API to delete records.(#141). | ||
|  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # confluent-kafka-javascript v0.5.0 | |
| v0.4.0 is a limited availability feature release. It is supported for all usage. | |
| ## Enhancements | |
| 1. Add support for an Admin API to delete records.(#141). | |
| # confluent-kafka-javascript v0.5.0 | |
| v0.5.0 is a limited availability feature release. It is supported for all usage. | |
| ## Enhancements | |
| 1. Add support for an Admin API to delete records.(#141). | |
Implemented delete records api in name of deleteTopicRecords and wrote tests and example for it.