Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.17.1
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#192](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/192)

## 3.17.0
- Added support for custom headers [#190](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/190)

Expand Down
4 changes: 3 additions & 1 deletion lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
require "monitor"

require_relative "elasticsearch/client"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"

class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
config_name "elasticsearch"
Expand Down Expand Up @@ -183,6 +182,9 @@ def register

test_connection!
setup_serverless
if get_client.es_transport_client_type == "elasticsearch_transport"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
end
end # def register

def filter(event)
Expand Down
18 changes: 16 additions & 2 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# encoding: utf-8
require "elasticsearch"
require "base64"
require "elasticsearch/transport/transport/http/manticore"


module LogStash
module Filters
class ElasticsearchClient

attr_reader :client
attr_reader :es_transport_client_type

BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
Expand Down Expand Up @@ -44,7 +44,7 @@ def initialize(logger, hosts, options = {})

client_options = {
hosts: hosts,
transport_class: ::Elasticsearch::Transport::Transport::HTTP::Manticore,
transport_class: get_transport_client_class,
transport_options: transport_options,
ssl: ssl_options,
retry_on_failure: options[:retry_on_failure],
Expand Down Expand Up @@ -98,6 +98,20 @@ def setup_api_key(api_key)
token = ::Base64.strict_encode64(api_key.value)
{ 'Authorization' => "ApiKey #{token}" }
end

def get_transport_client_class
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.
# LS-core updated `elasticsearch` > 8: https://github.com/elastic/logstash/pull/17161
# Following source bits are for the compatibility to support both `elasticsearch-transport` and `elastic-transport` gems
require "elasticsearch/transport/transport/http/manticore"
es_transport_client_type = "elasticsearch_transport"
::Elasticsearch::Transport::Transport::HTTP::Manticore
rescue ::LoadError
require "elastic/transport/transport/http/manticore"
es_transport_client_type = "elastic_transport"
::Elastic::Transport::Transport::HTTP::Manticore
end
end
end
end
4 changes: 2 additions & 2 deletions logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.17.0'
s.version = '3.17.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,7 +21,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9" # LS >= 6.7 and < 7.14 all used version 5.0.5
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9", '< 9'
s.add_runtime_dependency 'manticore', ">= 0.7.1"
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-normalize_config_support', '~>1.0'
Expand Down
49 changes: 43 additions & 6 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,18 @@
allow(plugin).to receive(:get_client).and_return(filter_client)
allow(filter_client).to receive(:serverless?).and_return(true)
allow(filter_client).to receive(:client).and_return(es_client)
allow(es_client).to receive(:info).with(a_hash_including(:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER)).and_raise(
Elasticsearch::Transport::Transport::Errors::BadRequest.new
)

if elastic_ruby_v8_client_available?
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elastic::Transport::Transport::Errors::BadRequest.new)
else
allow(es_client).to receive(:info)
.with(a_hash_including(
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
.and_raise(Elasticsearch::Transport::Transport::Errors::BadRequest.new)
end
end

it "raises an exception when Elastic Api Version is not supported" do
Expand Down Expand Up @@ -103,6 +112,11 @@

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client).to receive(:search).and_return(response)
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
Expand Down Expand Up @@ -347,6 +361,11 @@

before do
allow(plugin).to receive(:get_client).and_return(client_double)
if elastic_ruby_v8_client_available?
allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(client_double).to receive(:client).and_return(transport_double)
end

Expand Down Expand Up @@ -506,7 +525,12 @@ def wait_receive_request
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
transport_class = extract_transport(client).options.fetch(:transport_class)
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return("elastic_transport")
expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore
else
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
end
end

it 'uses a client with sufficient connection pool size' do
Expand Down Expand Up @@ -821,6 +845,11 @@ def wait_receive_request

before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
if elastic_ruby_v8_client_available?
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
else
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
end
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
plugin.register
Expand All @@ -835,11 +864,19 @@ def wait_receive_request
end
end

# @note can be removed once gem depends on elasticsearch >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
def extract_transport(client)
# on 7x: client.transport.transport
# on >=8.x: client.transport
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
end

def elastic_ruby_v8_client_available?
Elasticsearch::Transport
false
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
true
end

class MockResponse
attr_reader :code, :headers

Expand Down