diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f5e0a99..25abffa3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.6.2 + - Added scroll clearing and better handling of scroll expiration [#128](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/128) + ## 4.6.1 - [DOC] Removed outdated compatibility notice [#124](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/124) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 5d248446..f10dafbf 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -204,9 +204,12 @@ def register transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') - @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options, - :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, - :ssl => ssl_options) + @client = Elasticsearch::Client.new( + :hosts => hosts, + :transport_options => transport_options, + :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, + :ssl => ssl_options + ) end ## @@ -266,25 +269,41 @@ def do_run_slice(output_queue, slice_id=nil) slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) ) logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil? - r = search_request(slice_options) - r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } - logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? - - has_hits = r['hits']['hits'].any? + scroll_id = nil + begin + r = search_request(slice_options) - while has_hits && r['_scroll_id'] && !stop? - r = process_next_scroll(output_queue, r['_scroll_id']) + r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? - has_hits = r['has_hits'] + + has_hits = r['hits']['hits'].any? + scroll_id = r['_scroll_id'] + + while has_hits && scroll_id && !stop? + has_hits, scroll_id = process_next_scroll(output_queue, scroll_id) + logger.debug("Slice progress", slice_id: slice_id, slices: @slices) if logger.debug? && slice_id + end + logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil? + ensure + clear_scroll(scroll_id) end - logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil? end + ## + # @param output_queue [#<<] + # @param scroll_id [String]: a scroll id to resume + # @return [Array(Boolean,String)]: a tuple representing whether the response + # def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } - {'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']} + [r['hits']['hits'].any?, r['_scroll_id']] + rescue => e + # this will typically be triggered by a scroll timeout + logger.error("Scroll request error, aborting scroll", error: e.inspect) + # return no hits and original scroll_id so we can try to clear it + [false, scroll_id] end def push_hit(hit, output_queue) @@ -313,16 +332,21 @@ def push_hit(hit, output_queue) output_queue << event end + def clear_scroll(scroll_id) + @client.clear_scroll(scroll_id: scroll_id) if scroll_id + rescue => e + # ignore & log any clear_scroll errors + logger.warn("Ignoring clear_scroll exception", message: e.message) + end + def scroll_request scroll_id - client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) + @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end def search_request(options) - client.search(options) + @client.search(options) end - attr_reader :client - def hosts_default?(hosts) hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) end diff --git a/logstash-input-elasticsearch.gemspec b/logstash-input-elasticsearch.gemspec index 00e6b2bf..19445327 100644 --- a/logstash-input-elasticsearch.gemspec +++ b/logstash-input-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-elasticsearch' - s.version = '4.6.1' + s.version = '4.6.2' s.licenses = ['Apache License (2.0)'] s.summary = "Reads query results from an Elasticsearch cluster" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 85c08897..4027767c 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -8,9 +8,13 @@ require "time" require "date" -describe LogStash::Inputs::Elasticsearch do +class LogStash::Inputs::TestableElasticsearch < LogStash::Inputs::Elasticsearch + attr_reader :client +end + +describe LogStash::Inputs::TestableElasticsearch do - let(:plugin) { LogStash::Inputs::Elasticsearch.new(config) } + let(:plugin) { LogStash::Inputs::TestableElasticsearch.new(config) } let(:queue) { Queue.new } it_behaves_like "an interruptible input plugin" do @@ -32,6 +36,7 @@ } allow(esclient).to receive(:search) { { "hits" => { "hits" => [hit] } } } allow(esclient).to receive(:scroll) { { "hits" => { "hits" => [hit] } } } + allow(esclient).to receive(:clear_scroll).and_return(nil) end end @@ -76,6 +81,7 @@ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(response) expect(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(scroll_reponse) + expect(client).to receive(:clear_scroll).and_return(nil) event = input(config) do |pipeline, queue| queue.pop @@ -257,6 +263,8 @@ def synchronize_method!(object, method_name) expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) plugin.register + expect(client).to receive(:clear_scroll).and_return(nil) + # SLICE0 is a three-page scroll in which the last page is empty slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2})) expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0) @@ -360,6 +368,7 @@ def synchronize_method!(object, method_name) expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(response) allow(client).to receive(:scroll).with({ :body => {:scroll_id => "cXVlcnlUaGVuRmV0Y2g"}, :scroll => "1m" }).and_return(scroll_reponse) + allow(client).to receive(:clear_scroll).and_return(nil) end context 'when defining docinfo' do @@ -405,6 +414,7 @@ def synchronize_method!(object, method_name) "docinfo_target" => 'metadata_with_string' } } it 'thows an exception if the `docinfo_target` exist but is not of type hash' do + expect(client).not_to receive(:clear_scroll) plugin.register expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/) end