Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.6.2
- Added scroll clearing and better handling of scroll expiration [#128](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/128)

## 4.6.1
- [DOC] Removed outdated compatibility notice [#124](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/124)

Expand Down
58 changes: 41 additions & 17 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,12 @@ def register

transport_options[:proxy] = @proxy.to_s if @proxy && [email protected]?('')

@client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options,
:transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
:ssl => ssl_options)
@client = Elasticsearch::Client.new(
:hosts => hosts,
:transport_options => transport_options,
:transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
:ssl => ssl_options
)
end

##
Expand Down Expand Up @@ -266,25 +269,41 @@ def do_run_slice(output_queue, slice_id=nil)
slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) )

logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil?
r = search_request(slice_options)

r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil?

has_hits = r['hits']['hits'].any?
scroll_id = nil
begin
r = search_request(slice_options)

while has_hits && r['_scroll_id'] && !stop?
r = process_next_scroll(output_queue, r['_scroll_id'])
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil?
has_hits = r['has_hits']

has_hits = r['hits']['hits'].any?
scroll_id = r['_scroll_id']

while has_hits && scroll_id && !stop?
has_hits, scroll_id = process_next_scroll(output_queue, scroll_id)
logger.debug("Slice progress", slice_id: slice_id, slices: @slices) if logger.debug? && slice_id
end
logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil?
ensure
clear_scroll(scroll_id)
end
logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil?
end

##
# @param output_queue [#<<]
# @param scroll_id [String]: a scroll id to resume
# @return [Array(Boolean,String)]: a tuple representing whether the response
#
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
{'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}
[r['hits']['hits'].any?, r['_scroll_id']]
rescue => e
# this will typically be triggered by a scroll timeout
logger.error("Scroll request error, aborting scroll", error: e.inspect)
# return no hits and original scroll_id so we can try to clear it
[false, scroll_id]
end

def push_hit(hit, output_queue)
Expand Down Expand Up @@ -313,16 +332,21 @@ def push_hit(hit, output_queue)
output_queue << event
end

def clear_scroll(scroll_id)
@client.clear_scroll(scroll_id: scroll_id) if scroll_id
rescue => e
# ignore & log any clear_scroll errors
logger.warn("Ignoring clear_scroll exception", message: e.message)
end

def scroll_request scroll_id
client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end

def search_request(options)
client.search(options)
@client.search(options)
end

attr_reader :client

def hosts_default?(hosts)
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-elasticsearch'
s.version = '4.6.1'
s.version = '4.6.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads query results from an Elasticsearch cluster"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
14 changes: 12 additions & 2 deletions spec/inputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
require "time"
require "date"

describe LogStash::Inputs::Elasticsearch do
class LogStash::Inputs::TestableElasticsearch < LogStash::Inputs::Elasticsearch
attr_reader :client
end

describe LogStash::Inputs::TestableElasticsearch do

let(:plugin) { LogStash::Inputs::Elasticsearch.new(config) }
let(:plugin) { LogStash::Inputs::TestableElasticsearch.new(config) }
let(:queue) { Queue.new }

it_behaves_like "an interruptible input plugin" do
Expand All @@ -32,6 +36,7 @@
}
allow(esclient).to receive(:search) { { "hits" => { "hits" => [hit] } } }
allow(esclient).to receive(:scroll) { { "hits" => { "hits" => [hit] } } }
allow(esclient).to receive(:clear_scroll).and_return(nil)
end
end

Expand Down Expand Up @@ -76,6 +81,7 @@
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
expect(client).to receive(:search).with(any_args).and_return(response)
expect(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(scroll_reponse)
expect(client).to receive(:clear_scroll).and_return(nil)

event = input(config) do |pipeline, queue|
queue.pop
Expand Down Expand Up @@ -257,6 +263,8 @@ def synchronize_method!(object, method_name)
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
plugin.register

expect(client).to receive(:clear_scroll).and_return(nil)

# SLICE0 is a three-page scroll in which the last page is empty
slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2}))
expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0)
Expand Down Expand Up @@ -360,6 +368,7 @@ def synchronize_method!(object, method_name)
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
expect(client).to receive(:search).with(any_args).and_return(response)
allow(client).to receive(:scroll).with({ :body => {:scroll_id => "cXVlcnlUaGVuRmV0Y2g"}, :scroll => "1m" }).and_return(scroll_reponse)
allow(client).to receive(:clear_scroll).and_return(nil)
end

context 'when defining docinfo' do
Expand Down Expand Up @@ -405,6 +414,7 @@ def synchronize_method!(object, method_name)
"docinfo_target" => 'metadata_with_string'
} }
it 'thows an exception if the `docinfo_target` exist but is not of type hash' do
expect(client).not_to receive(:clear_scroll)
plugin.register
expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
end
Expand Down