From 09425821efecbb0e7ddcc44af146be00ddd233f8 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Mon, 17 Feb 2025 11:08:19 +0000 Subject: [PATCH] CPM handle 404 response gracefully with user-friendly log (#17052) Starting from es-output 12.0.2, a 404 response is treated as an error. Previously, central pipeline management considered 404 as an empty pipeline, not an error. This commit restores the expected behavior by handling 404 gracefully and logs a user-friendly message. It also removes the redundant cache of pipeline in CPM Fixes: #17035 (cherry picked from commit e896cd727dda41a0bda9ce6e65c4fb391c3d1b4f) --- .../config_management/elasticsearch_source.rb | 21 +++++----- .../elasticsearch_source_spec.rb | 39 +++++++++---------- 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 6ad08ad116a..0baf448f601 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -59,20 +59,19 @@ def get_pipeline_fetcher(es_version) def pipeline_configs logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids) - begin - license_check(true) - rescue LogStash::LicenseChecker::LicenseError => e - if @cached_pipelines.nil? - raise e - else - return @cached_pipelines - end - end + license_check(true) es_version = get_es_version fetcher = get_pipeline_fetcher(es_version) - fetcher.fetch_config(es_version, pipeline_ids, client) - @cached_pipelines = fetcher.get_pipeline_ids.collect do |pid| + begin + fetcher.fetch_config(es_version, pipeline_ids, client) + rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + # es-output 12.0.2 throws 404 as error, but we want to handle it as empty config + return [] if e.response_code == 404 + raise e + end + + fetcher.get_pipeline_ids.collect do |pid| get_pipeline(pid, fetcher) end.compact end diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 285c520a229..b16916ca5e6 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -103,23 +103,7 @@ "status" => 400} } - let(:elasticsearch_8_err_response) { - {"error" => - {"root_cause" => - [{"type" => "index_not_found_exception", - "reason" => "no such index [.logstash]", - "resource.type" => "index_expression", - "resource.id" => ".logstash", - "index_uuid" => "_na_", - "index" => ".logstash"}], - "type" => "index_not_found_exception", - "reason" => "no such index [.logstash]", - "resource.type" => "index_expression", - "resource.id" => ".logstash", - "index_uuid" => "_na_", - "index" => ".logstash"}, - "status" => 404} - } + let(:elasticsearch_8_err_response) { {"error" => "Incorrect HTTP method for uri", "status" => 405} } before do extension.additionals_settings(system_settings) @@ -466,13 +450,13 @@ }] }.to_json end - let(:es_path) { ".logstash/_mget" } + let(:legacy_api) { ".logstash/_mget" } let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } before do allow(mock_client).to receive(:get).with(system_indices_url_regex).and_return(LogStash::Json.load(elasticsearch_response)) allow(mock_client).to receive(:get).with("/").and_return(es_version_response) - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + allow(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) allow(mock_license_client).to receive(:get).with('/').and_return(cluster_info(LOGSTASH_VERSION)) @@ -493,7 +477,7 @@ before :each do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) allow_any_instance_of(described_class).to receive(:logger).and_return(logger_stub) - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + allow(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) end let(:config) { "input { generator {} } filter { mutate {} } output { }" } @@ -734,6 +718,7 @@ describe "when exception occur" do let(:elasticsearch_response) { "" } + let(:bad_response_404) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(404, "url", "request_body", "response_body") } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) @@ -747,9 +732,21 @@ it "raises the exception upstream in [7.9]" do allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) - expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad") + expect(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_raise("Something bad") expect { subject.pipeline_configs }.to raise_error /Something bad/ end + + it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404) + expect(subject.pipeline_configs).to be_empty + end + + it "returns empty pipeline when ES client raise BadResponseCodeError in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) + expect(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_raise(bad_response_404) + expect(subject.pipeline_configs).to be_empty + end end end