22require "logstash/devutils/rspec/spec_helper"
33require "logstash/inputs/elasticsearch"
44require "elasticsearch"
5- require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
65
7- describe LogStash ::Inputs ::Elasticsearch , :ecs_compatibility_support do
8- let ( :plugin ) { described_class . new ( config ) }
6+ describe LogStash ::Inputs ::Elasticsearch ::Esql do
97 let ( :client ) { instance_double ( Elasticsearch ::Client ) }
10- let ( :queue ) { Queue . new }
11- let ( :cluster_info ) { { "version" => { "number" => "8.11.0" , "build_flavor" => "default" } } }
12-
13- let ( :config ) do
8+ let ( :esql_client ) { double ( "esql-client" ) }
9+ let ( :plugin ) { instance_double ( LogStash ::Inputs ::Elasticsearch , params : plugin_config ) }
10+ let ( :plugin_config ) do
1411 {
1512 "query" => "FROM test-index | STATS count() BY field" ,
16- "response_type" => "esql" ,
1713 "retries" => 3
1814 }
1915 end
16+ let ( :esql_executor ) { described_class . new ( client , plugin ) }
2017
21- describe "#initialize " do
18+ describe "when initializes " do
2219 it "sets up the ESQL client with correct parameters" do
23- expect ( plugin . instance_variable_get ( :@query ) ) . to eq ( config [ "query" ] )
24- expect ( plugin . instance_variable_get ( :@response_type ) ) . to eq ( config [ "response_type" ] )
25- expect ( plugin . instance_variable_get ( :@retries ) ) . to eq ( config [ "retries" ] )
20+ expect ( esql_executor . instance_variable_get ( :@query ) ) . to eq ( plugin_config [ "query" ] )
21+ expect ( esql_executor . instance_variable_get ( :@retries ) ) . to eq ( plugin_config [ "retries" ] )
2622 end
2723 end
2824
29- describe "#register" do
30- before ( :each ) do
31- Elasticsearch ::Client . send ( :define_method , :ping ) { }
32- allow_any_instance_of ( Elasticsearch ::Client ) . to receive ( :info ) . and_return ( cluster_info )
25+ describe "when faces error while retrying" do
26+ it "retries the given block the specified number of times" do
27+ attempts = 0
28+ result = esql_executor . retryable ( "Test Job" ) do
29+ attempts += 1
30+ raise StandardError if attempts < 3
31+ "success"
32+ end
33+ expect ( attempts ) . to eq ( 3 )
34+ expect ( result ) . to eq ( "success" )
3335 end
34- it "creates ES|QL executor" do
35- plugin . register
36- expect ( plugin . instance_variable_get ( :@query_executor ) ) . to be_an_instance_of ( LogStash ::Inputs ::Elasticsearch ::Esql )
36+
37+ it "returns false if the block fails all attempts" do
38+ result = esql_executor . retryable ( "Test Job" ) do
39+ raise StandardError
40+ end
41+ expect ( result ) . to eq ( false )
3742 end
3843 end
3944
40- describe "#validation" do
45+ describe "when executing chain of processes" do
46+ let ( :output_queue ) { Queue . new }
47+ let ( :response ) { { 'values' => [ %w[ foo bar ] ] , 'columns' => [ { 'name' => 'id' } , { 'name' => 'val' } ] } }
4148
42- describe "LS version" do
43- context "when compatible" do
44- before ( :each ) do
45- stub_const ( "LogStash::VERSION" , "8.11.0" )
46- end
47-
48- it "does not raise an error" do
49- expect { plugin . send ( :validate_ls_version_for_esql_support! ) } . not_to raise_error
50- end
51- end
52-
53- context "when incompatible" do
54- before ( :each ) do
55- stub_const ( "LOGSTASH_VERSION" , "8.10.0" )
56- end
49+ before do
50+ allow ( esql_executor ) . to receive ( :retryable ) . and_yield
51+ allow ( client ) . to receive_message_chain ( :esql , :query ) . and_return ( response )
52+ allow ( plugin ) . to receive ( :decorate_and_push_to_queue )
53+ end
5754
58- it "raises a runtime error" do
59- expect { plugin . send ( :validate_ls_version_for_esql_support! ) }
60- . to raise_error ( RuntimeError , /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/ )
61- end
62- end
55+ it "executes the ESQL query and processes the results" do
56+ allow ( response ) . to receive ( :headers ) . and_return ( { } )
57+ esql_executor . do_run ( output_queue )
58+ expect ( plugin ) . to have_received ( :decorate_and_push_to_queue ) . with ( output_queue , { 'id' => 'foo' , 'val' => 'bar' } )
6359 end
6460
65- describe "ES version" do
66- before ( :each ) do
67- allow ( plugin ) . to receive ( :es_version ) . and_return ( "8.10.5" )
68- end
61+ it "logs a warning if the response contains a warning header" do
62+ allow ( response ) . to receive ( :headers ) . and_return ( { "warning" => "some warning" } )
63+ expect ( esql_executor . logger ) . to receive ( :warn ) . with ( "ES|QL executor received warning" , { :message => "some warning" } )
64+ esql_executor . do_run ( output_queue )
65+ end
6966
70- context "when incompatible" do
71- it "raises a runtime error" do
72- expect { plugin . send ( :validate_es_for_esql_support! ) }
73- . to raise_error ( RuntimeError , /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./ )
74- end
75- end
67+ it "does not log a warning if the response does not contain a warning header" do
68+ allow ( response ) . to receive ( :headers ) . and_return ( { } )
69+ expect ( esql_executor . logger ) . not_to receive ( :warn )
70+ esql_executor . do_run ( output_queue )
7671 end
72+ end
7773
78- describe "ES|QL query" do
79- context "when query is valid" do
80- it "does not raise an error" do
81- expect { plugin . send ( :validate_esql_query! ) } . not_to raise_error
82- end
83- end
8474
85- context "when query is empty" do
86- let ( :config ) do
87- {
88- "query" => " "
89- }
90- end
75+ describe "when starts processing the response" do
76+ let ( :output_queue ) { Queue . new }
77+ let ( :values ) { [ %w[ foo bar ] ] }
78+ let ( :columns ) { [ { 'name' => 'id' } , { 'name' => 'val' } ] }
9179
92- # TODO: make shared spec
93- it "raises a configuration error" do
94- expect { plugin . send ( :validate_esql_query! ) }
95- . to raise_error ( LogStash :: ConfigurationError , /`query` cannot be empty/ )
96- end
97- end
80+ it "processes the ESQL response and pushes events to the output queue" do
81+ allow ( plugin ) . to receive ( :decorate_and_push_to_queue )
82+ esql_executor . send ( :process_response , values , columns , output_queue )
83+ expect ( plugin ) . to have_received ( :decorate_and_push_to_queue ) . with ( output_queue , { 'id' => 'foo' , 'val' => 'bar' } )
84+ end
85+ end
9886
99- context "when query doesn't align with ES syntax" do
100- let ( :config ) do
101- {
102- "query" => "RANDOM query"
103- }
104- end
87+ describe "when maps column and values" do
88+ let ( :columns ) { [ { 'name' => 'id' } , { 'name' => 'val' } ] }
89+ let ( :values ) { %w[ foo bar ] }
10590
106- it "raises a configuration error" do
107- source_commands = %w[ FROM ROW SHOW ]
108- expect { plugin . send ( :validate_esql_query! ) }
109- . to raise_error ( LogStash ::ConfigurationError , "`query` needs to start with any of #{ source_commands } " )
110- end
111- end
91+ it "maps column names to their corresponding values" do
92+ result = esql_executor . send ( :map_column_and_values , columns , values )
93+ expect ( result ) . to eq ( { 'id' => 'foo' , 'val' => 'bar' } )
11294 end
11395 end
114-
115- end
96+ end
0 commit comments