Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,18 @@
allow(plugin).to receive(:get_client).and_return(filter_client)
allow(filter_client).to receive(:serverless?).and_return(true)
allow(filter_client).to receive(:client).and_return(es_client)
allow(es_client).to receive(:info).with(a_hash_including(:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER)).and_raise(
Elasticsearch::Transport::Transport::Errors::BadRequest.new
)

if elastic_ruby_v8_client_available?
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elastic::Transport::Transport::Errors::BadRequest.new)
else
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elasticsearch::Transport::Transport::Errors::BadRequest.new)
end
end

it "raises an exception when Elastic Api Version is not supported" do
Expand Down Expand Up @@ -103,7 +112,11 @@

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client).to receive(:search).and_return(response)
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
Expand Down Expand Up @@ -348,7 +361,11 @@

before do
allow(plugin).to receive(:get_client).and_return(client_double)
allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
if elastic_ruby_v8_client_available?
allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client_double).to receive(:client).and_return(transport_double)
end

Expand Down Expand Up @@ -508,7 +525,12 @@ def wait_receive_request
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
transport_class = extract_transport(client).options.fetch(:transport_class)
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return("elastic_transport")
expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore
else
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
end
end

it 'uses a client with sufficient connection pool size' do
Expand Down Expand Up @@ -823,7 +845,11 @@ def wait_receive_request

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
plugin.register
Expand All @@ -838,11 +864,19 @@ def wait_receive_request
end
end

# @note can be removed once gem depends on elasticsearch >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
def extract_transport(client)
# on 7x: client.transport.transport
# on >=8.x: client.transport
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
end

def elastic_ruby_v8_client_available?
Elasticsearch::Transport
false
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
true
end

class MockResponse
attr_reader :code, :headers

Expand Down