Skip to content

Conversation

@khushijain21
Copy link
Contributor

@khushijain21 khushijain21 commented Oct 22, 2025

Proposed commit message

This PR adds support for output.[elasticsearch].pipelines parameter in beatreceivers. For Ref: https://www.elastic.co/docs/reference/fleet/elasticsearch-output#output-elasticsearch-data-parsing-settings

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

This requires pipelines config be set under otelconsumer. The final beatreceiver config will now look like this

filebeatreceiver:
   filebeat:
     inputs:
       ...
     output:
       otelconsumer:
          pipelines:
                - pipeline: "warning_pipeline"
                  when.contains:
                 message: "WARN"

Author's Checklist

  • [ ]

How to test this PR locally

  1. Create two pipeline on elasticsearch. Can be done via Kibana's dev tools
PUT _ingest/pipeline/error_pipeline
{
  "description": "Add timestamp and remove unwanted fields",
  "processors": [
    {
      "set": {
        "field": "error_key",
        "value": "this is error value"
      }
    }
  ]
}
  1. Start filebeat otel with following config file
filebeat.inputs:
  - type: filestream
    id: filestream-input-id
    enabled: true
    file_identity.native: ~
    prospector.scanner.fingerprint.enabled: false    
    paths:
      - ./test.json

output:
  elasticsearch:
    hosts: ["http://localhost:9200"]
    username: admin
    password: testing
    pipelines:
      - pipeline: "warning_pipeline"
        when.contains:
          message: "WARN"
      - pipeline: "error_pipeline"
        when.contains:
          message: "ERR"

and you can observer the final events have an extra field error_key: error value

Related issues

@khushijain21 khushijain21 requested a review from a team as a code owner October 22, 2025 10:17
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Oct 22, 2025
@khushijain21 khushijain21 marked this pull request as draft October 22, 2025 10:17
@github-actions
Copy link
Contributor

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Oct 22, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @khushijain21? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

return outputs.Fail(err)
}

_, pipelineSelector, err := elasticsearch.BuildSelectors(im, beat, cfg)
Copy link
Contributor Author

@khushijain21 khushijain21 Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not use early encoding here like elasticsearch output does https://github.com/khushijain21/beats/blob/pipelines/libbeat/outputs/elasticsearch/elasticsearch.go#L91

in favor of this issue here #44105 which intends to remove beat pipeline for beatreceivers

@khushijain21
Copy link
Contributor Author

A follow up PR will be required to update this on elastic-agent. Since some of the libbeat API is now set to public to allow pipelines setting - beats version will have to be updated first on elastic-agent

@khushijain21 khushijain21 marked this pull request as ready for review October 22, 2025 10:28
@khushijain21 khushijain21 added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Oct 22, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 22, 2025
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@khushijain21 khushijain21 added needs_team Indicates that the issue/PR needs a Team:* label backport-9.2 Automated backport to the 9.2 branch labels Oct 22, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 22, 2025
@khushijain21 khushijain21 added needs_team Indicates that the issue/PR needs a Team:* label backport-8.19 Automated backport to the 8.19 branch labels Oct 22, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 22, 2025
@khushijain21 khushijain21 requested review from leehinman and mauri870 and removed request for andrzej-stencel October 23, 2025 07:12
Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we specify a pipeline in the exporter config with our current integrations? Integrations specify a pipeline per event. We need whatever the current behavior is with process runtime to be replicated when we switch to otel runtime.

@khushijain21
Copy link
Contributor Author

khushijain21 commented Oct 29, 2025

What happens if we specify a pipeline in the exporter config with our current integrations?

I tested this and for beats the behavior is

  • when output.pipeline is set and an integration that already uses a default_pipeline (in this case nginx) is used - then output.pipeline field is ignored. I see that beats adds pipeline field in the action line of _bulk API. Here

i.e our bulk requests look like

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" , _pipeline: error_pipeline} }
  • the event via beatreceivers on the other hand go via both the default_pipeline AND the output.pipeline. I see that elasticsearchexporter sets pipeline in the query parameter to the bulk request. Here
    i.e the outgoing bulk request looks like this
POST _bulk/?pipeline=error_pipeline
{ "index" : { "_index" : "test", "_id" : "1" } }

The elasticsearch documentation does not explicitly specify which takes precedence link - the query parameter or the the pipeline defined in action line. I am going to perform some more testing to be sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-8.19 Automated backport to the 8.19 branch backport-9.2 Automated backport to the 9.2 branch Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants