From 5dbd9303790b79a70b35a0786970ca962e529b9e Mon Sep 17 00:00:00 2001 From: Florian GAULTIER Date: Mon, 20 Apr 2020 17:36:18 +0200 Subject: [PATCH 1/2] Compression should be done on splitted batches --- docs/index.asciidoc | 127 ++++++++++-------- .../outputs/elasticsearch/http_client.rb | 47 +++---- 2 files changed, 85 insertions(+), 89 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 181944373..8fe769712 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -48,7 +48,7 @@ plugin to version 6.2.5 or higher. If you plan to use the Kibana web interface, use the Elasticsearch output plugin to get your log data into -Elasticsearch. +Elasticsearch. TIP: You can run Elasticsearch on your own hardware, or use our https://www.elastic.co/cloud/elasticsearch-service[hosted {es} Service] on @@ -58,7 +58,7 @@ Elastic Cloud. The Elasticsearch Service is available on both AWS and GCP. This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having -to upgrade Logstash in lock-step. +to upgrade Logstash in lock-step. You can learn more about Elasticsearch at @@ -82,7 +82,7 @@ the new template is installed. [NOTE] ================================================================================ -You cannot use dynamic variable substitution when `ilm_enabled` is `true` and +You cannot use dynamic variable substitution when `ilm_enabled` is `true` and when using `ilm_rollover_alias`. ================================================================================ @@ -106,7 +106,7 @@ Example: index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}" } } - + **What to do in case there is no field in the event containing the destination index prefix?** You can use the `mutate` filter and conditionals to add a `[@metadata]` field (see https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#metadata) to set @@ -142,7 +142,7 @@ HTTP requests to the bulk API are expected to return a 200 response code. All ot The following document errors are handled as follows: * 400 and 404 errors are sent to the dead letter queue (DLQ), if enabled. If a DLQ is not enabled, a log message will be emitted, and the event will be dropped. See <> for more info. - * 409 errors (conflict) are logged as a warning and dropped. + * 409 errors (conflict) are logged as a warning and dropped. Note that 409 exceptions are no longer retried. Please set a higher `retry_on_conflict` value if you experience 409 exceptions. It is more performant for Elasticsearch to retry these exceptions than this plugin. @@ -209,7 +209,9 @@ NOTE: If the index property is supplied in the output definition, it will be ove ==== Batch Sizes This plugin attempts to send batches of events as a single request. However, if -a request exceeds 20MB we will break it up into multiple batch requests. If a single document exceeds 20MB it will be sent as a single request. +a request exceeds `http_max_content_length` we will break it up into multiple batch requests. If a single document exceeds this length it will be sent as a single request. + +Compression is done on each batch as elasticsearch `http.max_content_length` is validated on uncompressed data. ==== DNS Caching @@ -224,12 +226,12 @@ not reevaluate its DNS value while the keepalive is in effect. ==== HTTP Compression -This plugin supports request and response compression. Response compression is enabled by default and -for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[in +This plugin supports request and response compression. Response compression is enabled by default and +for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[in Elasticsearch] to take advantage of response compression when using this plugin -For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` setting in their Logstash config file. @@ -254,6 +256,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>, one of `["true", "false", "auto"]`|No | <> |<>|No | <> |<>|No @@ -304,7 +307,7 @@ output plugins.   [id="plugins-{type}s-{plugin}-action"] -===== `action` +===== `action` * Value type is <> * Default value is `"index"` @@ -325,7 +328,7 @@ The Elasticsearch action to perform. Valid actions are: For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] [id="plugins-{type}s-{plugin}-bulk_path"] -===== `bulk_path` +===== `bulk_path` * Value type is <> * There is no default value for this setting. @@ -334,7 +337,7 @@ HTTP Path to perform the _bulk requests to this defaults to a concatenation of the path parameter and "_bulk" [id="plugins-{type}s-{plugin}-cacert"] -===== `cacert` +===== `cacert` * Value type is <> * There is no default value for this setting. @@ -362,7 +365,7 @@ Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used. For more details, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[Logstash-to-Cloud documentation] [id="plugins-{type}s-{plugin}-doc_as_upsert"] -===== `doc_as_upsert` +===== `doc_as_upsert` * Value type is <> * Default value is `false` @@ -371,7 +374,7 @@ Enable `doc_as_upsert` for update mode. Create a new document with source if `document_id` doesn't exist in Elasticsearch [id="plugins-{type}s-{plugin}-document_id"] -===== `document_id` +===== `document_id` * Value type is <> * There is no default value for this setting. @@ -379,7 +382,7 @@ Create a new document with source if `document_id` doesn't exist in Elasticsearc The document ID for the index. Useful for overwriting existing entries in Elasticsearch with the same ID. [id="plugins-{type}s-{plugin}-document_type"] -===== `document_type` +===== `document_type` * Value type is <> * There is no default value for this setting. @@ -400,7 +403,7 @@ If you don't set a value for this option: - for elasticsearch clusters 5.x and below: the event's 'type' field will be used, if the field is not present the value of 'doc' will be used. [id="plugins-{type}s-{plugin}-failure_type_logging_whitelist"] -===== `failure_type_logging_whitelist` +===== `failure_type_logging_whitelist` * Value type is <> * Default value is `[]` @@ -421,7 +424,7 @@ an elasticsearch node. The headers will be used for any kind of request These custom headers will be overidden by settings like `http_compression`. [id="plugins-{type}s-{plugin}-healthcheck_path"] -===== `healthcheck_path` +===== `healthcheck_path` * Value type is <> * There is no default value for this setting. @@ -432,7 +435,7 @@ before it is once again eligible to service requests. If you have custom firewall rules you may need to change this [id="plugins-{type}s-{plugin}-hosts"] -===== `hosts` +===== `hosts` * Value type is <> * Default value is `[//127.0.0.1]` @@ -450,13 +453,21 @@ to prevent LS from sending bulk requests to the master nodes. So this parameter Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance. [id="plugins-{type}s-{plugin}-http_compression"] -===== `http_compression` +===== `http_compression` * Value type is <> * Default value is `false` Enable gzip compression on requests. Note that response compression is on by default for Elasticsearch v5.0 and beyond +[id="plugins-{type}s-{plugin}-http_max_content_length"] +===== `http_max_content_length` + + * Value type is <> + * Default value is `104857600` (100 * 1024 * 1024 == 100MiB) + +The max content of an HTTP should follow elasticsearch config (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html). + [id="plugins-{type}s-{plugin}-ilm_enabled"] ===== `ilm_enabled` @@ -513,7 +524,7 @@ NOTE: Updating the rollover alias will require the index template to be rewritte NOTE: `ilm_rollover_alias` does NOT support dynamic variable substitution as `index` does. [id="plugins-{type}s-{plugin}-index"] -===== `index` +===== `index` * Value type is <> * Default value is `"logstash-%{+yyyy.MM.dd}"` @@ -527,7 +538,7 @@ LS uses Joda to format the index pattern from event timestamp. Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. [id="plugins-{type}s-{plugin}-keystore"] -===== `keystore` +===== `keystore` * Value type is <> * There is no default value for this setting. @@ -536,7 +547,7 @@ The keystore used to present a certificate to the server. It can be either .jks or .p12 [id="plugins-{type}s-{plugin}-keystore_password"] -===== `keystore_password` +===== `keystore_password` * Value type is <> * There is no default value for this setting. @@ -544,7 +555,7 @@ It can be either .jks or .p12 Set the keystore password [id="plugins-{type}s-{plugin}-manage_template"] -===== `manage_template` +===== `manage_template` * Value type is <> * Default value is `true` @@ -563,7 +574,7 @@ field names) you should set `manage_template` to false and use the REST API to apply your templates manually. [id="plugins-{type}s-{plugin}-parameters"] -===== `parameters` +===== `parameters` * Value type is <> * There is no default value for this setting. @@ -573,7 +584,7 @@ to every host listed in the 'hosts' configuration. If the 'hosts' list contains urls that already have query strings, the one specified here will be appended. [id="plugins-{type}s-{plugin}-parent"] -===== `parent` +===== `parent` * Value type is <> * Default value is `nil` @@ -582,7 +593,7 @@ For child documents, ID of the associated parent. This can be dynamic using the `%{foo}` syntax. [id="plugins-{type}s-{plugin}-password"] -===== `password` +===== `password` * Value type is <> * There is no default value for this setting. @@ -590,7 +601,7 @@ This can be dynamic using the `%{foo}` syntax. Password to authenticate to a secure Elasticsearch cluster [id="plugins-{type}s-{plugin}-path"] -===== `path` +===== `path` * Value type is <> * There is no default value for this setting. @@ -601,7 +612,7 @@ Note that if you use paths as components of URLs in the 'hosts' field you may not also set this field. That will raise an error at startup [id="plugins-{type}s-{plugin}-pipeline"] -===== `pipeline` +===== `pipeline` * Value type is <> * Default value is `nil` @@ -610,7 +621,7 @@ Set which ingest pipeline you wish to execute for an event. You can also use eve here like `pipeline => "%{INGEST_PIPELINE}"` [id="plugins-{type}s-{plugin}-pool_max"] -===== `pool_max` +===== `pool_max` * Value type is <> * Default value is `1000` @@ -621,7 +632,7 @@ Setting this too low may mean frequently closing / opening connections which is bad. [id="plugins-{type}s-{plugin}-pool_max_per_route"] -===== `pool_max_per_route` +===== `pool_max_per_route` * Value type is <> * Default value is `100` @@ -632,7 +643,7 @@ Setting this too low may mean frequently closing / opening connections which is bad. [id="plugins-{type}s-{plugin}-proxy"] -===== `proxy` +===== `proxy` * Value type is <> * There is no default value for this setting. @@ -643,7 +654,7 @@ An empty string is treated as if proxy was not set. This is useful when using environment variables e.g. `proxy => '${LS_PROXY:}'`. [id="plugins-{type}s-{plugin}-resurrect_delay"] -===== `resurrect_delay` +===== `resurrect_delay` * Value type is <> * Default value is `5` @@ -653,7 +664,7 @@ Resurrection is the process by which backend endpoints marked 'down' are checked to see if they have come back to life [id="plugins-{type}s-{plugin}-retry_initial_interval"] -===== `retry_initial_interval` +===== `retry_initial_interval` * Value type is <> * Default value is `2` @@ -661,7 +672,7 @@ to see if they have come back to life Set initial interval in seconds between bulk retries. Doubled on each retry up to `retry_max_interval` [id="plugins-{type}s-{plugin}-retry_max_interval"] -===== `retry_max_interval` +===== `retry_max_interval` * Value type is <> * Default value is `64` @@ -669,7 +680,7 @@ Set initial interval in seconds between bulk retries. Doubled on each retry up t Set max interval in seconds between bulk retries. [id="plugins-{type}s-{plugin}-retry_on_conflict"] -===== `retry_on_conflict` +===== `retry_on_conflict` * Value type is <> * Default value is `1` @@ -679,7 +690,7 @@ See the https://www.elastic.co/guide/en/elasticsearch/guide/current/partial-upda for more info [id="plugins-{type}s-{plugin}-routing"] -===== `routing` +===== `routing` * Value type is <> * There is no default value for this setting. @@ -688,7 +699,7 @@ A routing override to be applied to all processed events. This can be dynamic using the `%{foo}` syntax. [id="plugins-{type}s-{plugin}-script"] -===== `script` +===== `script` * Value type is <> * Default value is `""` @@ -704,7 +715,7 @@ Example: } [id="plugins-{type}s-{plugin}-script_lang"] -===== `script_lang` +===== `script_lang` * Value type is <> * Default value is `"painless"` @@ -713,7 +724,7 @@ Set the language of the used script. If not set, this defaults to painless in ES When using indexed (stored) scripts on Elasticsearch 6 and higher, you must set this parameter to `""` (empty string). [id="plugins-{type}s-{plugin}-script_type"] -===== `script_type` +===== `script_type` * Value can be any of: `inline`, `indexed`, `file` * Default value is `["inline"]` @@ -724,7 +735,7 @@ Define the type of script referenced by "script" variable file : "script" contains the name of script stored in elasticsearch's config directory [id="plugins-{type}s-{plugin}-script_var_name"] -===== `script_var_name` +===== `script_var_name` * Value type is <> * Default value is `"event"` @@ -732,7 +743,7 @@ Define the type of script referenced by "script" variable Set variable name passed to script (scripted update) [id="plugins-{type}s-{plugin}-scripted_upsert"] -===== `scripted_upsert` +===== `scripted_upsert` * Value type is <> * Default value is `false` @@ -740,7 +751,7 @@ Set variable name passed to script (scripted update) if enabled, script is in charge of creating non-existent document (scripted update) [id="plugins-{type}s-{plugin}-sniffing"] -===== `sniffing` +===== `sniffing` * Value type is <> * Default value is `false` @@ -750,7 +761,7 @@ For Elasticsearch 1.x and 2.x any nodes with `http.enabled` (on by default) will For Elasticsearch 5.x and 6.x any nodes with `http.enabled` (on by default) will be added to the hosts list, excluding master-only nodes. [id="plugins-{type}s-{plugin}-sniffing_delay"] -===== `sniffing_delay` +===== `sniffing_delay` * Value type is <> * Default value is `5` @@ -758,7 +769,7 @@ For Elasticsearch 5.x and 6.x any nodes with `http.enabled` (on by default) will How long to wait, in seconds, between sniffing attempts [id="plugins-{type}s-{plugin}-sniffing_path"] -===== `sniffing_path` +===== `sniffing_path` * Value type is <> * There is no default value for this setting. @@ -769,7 +780,7 @@ if sniffing_path is set it will be used as an absolute path do not use full URL here, only paths, e.g. "/sniff/_nodes/http" [id="plugins-{type}s-{plugin}-ssl"] -===== `ssl` +===== `ssl` * Value type is <> * There is no default value for this setting. @@ -779,7 +790,7 @@ is specified in the URLs listed in 'hosts'. If no explicit protocol is specified If SSL is explicitly disabled here the plugin will refuse to start if an HTTPS URL is given in 'hosts' [id="plugins-{type}s-{plugin}-ssl_certificate_verification"] -===== `ssl_certificate_verification` +===== `ssl_certificate_verification` * Value type is <> * Default value is `true` @@ -789,7 +800,7 @@ For more information on disabling certificate verification please read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf [id="plugins-{type}s-{plugin}-template"] -===== `template` +===== `template` * Value type is <> * There is no default value for this setting. @@ -798,7 +809,7 @@ You can set the path to your own template here, if you so desire. If not set, the included template will be used. [id="plugins-{type}s-{plugin}-template_name"] -===== `template_name` +===== `template_name` * Value type is <> * Default value is `"logstash"` @@ -812,7 +823,7 @@ change this, you will need to prune the old template manually, e.g. where `OldTemplateName` is whatever the former setting was. [id="plugins-{type}s-{plugin}-template_overwrite"] -===== `template_overwrite` +===== `template_overwrite` * Value type is <> * Default value is `false` @@ -829,7 +840,7 @@ template (logstash), setting this to true will make Logstash to overwrite the "logstash" template (i.e. removing all customized settings) [id="plugins-{type}s-{plugin}-timeout"] -===== `timeout` +===== `timeout` * Value type is <> * Default value is `60` @@ -838,7 +849,7 @@ Set the timeout, in seconds, for network operations and requests sent Elasticsea a timeout occurs, the request will be retried. [id="plugins-{type}s-{plugin}-truststore"] -===== `truststore` +===== `truststore` * Value type is <> * There is no default value for this setting. @@ -848,7 +859,7 @@ It can be either .jks or .p12. Use either `:truststore` or `:cacert`. [id="plugins-{type}s-{plugin}-truststore_password"] -===== `truststore_password` +===== `truststore_password` * Value type is <> * There is no default value for this setting. @@ -856,7 +867,7 @@ Use either `:truststore` or `:cacert`. Set the truststore password [id="plugins-{type}s-{plugin}-upsert"] -===== `upsert` +===== `upsert` * Value type is <> * Default value is `""` @@ -865,7 +876,7 @@ Set upsert content for update mode. Create a new document with this parameter as json string if `document_id` doesn't exists [id="plugins-{type}s-{plugin}-user"] -===== `user` +===== `user` * Value type is <> * There is no default value for this setting. @@ -873,7 +884,7 @@ Create a new document with this parameter as json string if `document_id` doesn' Username to authenticate to a secure Elasticsearch cluster [id="plugins-{type}s-{plugin}-validate_after_inactivity"] -===== `validate_after_inactivity` +===== `validate_after_inactivity` * Value type is <> * Default value is `10000` @@ -888,7 +899,7 @@ have become stale (half-closed) while kept inactive in the pool.' See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#setValidateAfterInactivity(int)[these docs for more info] [id="plugins-{type}s-{plugin}-version"] -===== `version` +===== `version` * Value type is <> * There is no default value for this setting. @@ -897,7 +908,7 @@ The version to use for indexing. Use sprintf syntax like `%{my_version}` to use See https://www.elastic.co/blog/elasticsearch-versioning-support. [id="plugins-{type}s-{plugin}-version_type"] -===== `version_type` +===== `version_type` * Value can be any of: `internal`, `external`, `external_gt`, `external_gte`, `force` * There is no default value for this setting. diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..feae9439d 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -8,21 +8,6 @@ require 'stringio' module LogStash; module Outputs; class ElasticSearch; - # This is a constant instead of a config option because - # there really isn't a good reason to configure it. - # - # The criteria used are: - # 1. We need a number that's less than 100MiB because ES - # won't accept bulks larger than that. - # 2. It must be large enough to amortize the connection constant - # across multiple requests. - # 3. It must be small enough that even if multiple threads hit this size - # we won't use a lot of heap. - # - # We wound up agreeing that a number greater than 10 MiB and less than 100MiB - # made sense. We picked one on the lowish side to not use too much heap. - TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB - class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count # This is here in case we use DEFAULT_OPTIONS in the future @@ -107,26 +92,19 @@ def bulk(actions) end body_stream = StringIO.new - if http_compression - body_stream.set_encoding "BINARY" - stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) - else - stream_writer = body_stream - end bulk_responses = [] bulk_actions.each do |action| as_json = action.is_a?(Array) ? action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" - if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES + if (body_stream.size + as_json.bytesize) > http_max_content_length bulk_responses << bulk_send(body_stream) unless body_stream.size == 0 end - stream_writer.write(as_json) + body_stream.write(as_json) end - stream_writer.close if http_compression bulk_responses << bulk_send(body_stream) if body_stream.size > 0 - body_stream.close if !http_compression + body_stream.close join_bulk_responses(bulk_responses) end @@ -139,8 +117,9 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream) params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {} + body = http_compression ? Zlib::gzip(body_stream.string) : body_stream.string # Discard the URL - response = @pool.post(@bulk_path, params, body_stream.string) + response = @pool.post(@bulk_path, params, body) if !body_stream.closed? body_stream.truncate(0) body_stream.seek(0) @@ -215,7 +194,7 @@ def scheme else nil end - + calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing) if calculated_scheme && calculated_scheme !~ /https?/ @@ -235,7 +214,7 @@ def port # Enter things like foo:123, bar and wind up with foo:123, bar:9200 calculate_property(uris, :port, nil, sniffing) || 9200 end - + def uris @options[:hosts] end @@ -252,9 +231,15 @@ def http_compression client_settings.fetch(:http_compression, false) end + def http_max_content_length + # Follow elasticsearch default http.max_content_length + # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html + client_settings.fetch(:max_content_length, 104857600) # 100 * 1024 * 1024 == 100MiB + end + def build_adapter(options) timeout = options[:timeout] || 0 - + adapter_options = { :socket_timeout => timeout, :request_timeout => timeout, @@ -281,7 +266,7 @@ def build_adapter(options) adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter adapter = adapter_class.new(@logger, adapter_options) end - + def build_pool(options) adapter = build_adapter(options) @@ -331,7 +316,7 @@ def host_to_url(h) h.query end prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil - + raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}" ::LogStash::Util::SafeURI.new(raw_url) From 328b101780c590d2e8b2c31ec4f96ec26b0f1e5f Mon Sep 17 00:00:00 2001 From: Florian GAULTIER Date: Mon, 20 Apr 2020 19:58:54 +0200 Subject: [PATCH 2/2] Fix specs --- spec/integration/outputs/index_spec.rb | 29 +++++++++---------- .../outputs/elasticsearch/http_client_spec.rb | 26 ++++++++--------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 710ba653e..52b12ccbe 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -1,8 +1,7 @@ require_relative "../../../spec/es_spec_helper" require "logstash/outputs/elasticsearch" -describe "TARGET_BULK_BYTES", :integration => true do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } +describe "http_max_content_length", :integration => true do let(:event_count) { 1000 } let(:events) { event_count.times.map { event }.to_a } let(:config) { @@ -23,11 +22,11 @@ end describe "batches that are too large for one" do - let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) } + let(:event) { LogStash::Event.new("message" => "a " * (((subject.client.http_max_content_length/2) / event_count)+1)) } it "should send in two batches" do expect(subject.client).to have_received(:bulk_send).twice do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= subject.client.http_max_content_length end end @@ -38,7 +37,7 @@ it "should send in one batch" do expect(subject.client).to have_received(:bulk_send).once do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= subject.client.http_max_content_length end end end @@ -53,7 +52,7 @@ let(:config) { "not implemented" } let(:events) { event_count.times.map { event }.to_a } subject { LogStash::Outputs::ElasticSearch.new(config) } - + let(:es_url) { "http://#{get_host_port}" } let(:index_url) {"#{es_url}/#{index}"} let(:http_client_options) { {} } @@ -65,7 +64,7 @@ subject.register subject.multi_receive([]) end - + shared_examples "an indexer" do |secure| it "ships events" do subject.multi_receive(events) @@ -85,13 +84,13 @@ expect(doc["_index"]).to eq(index) end end - + it "sets the correct content-type header" do expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything} if secure expected_manticore_opts = { - :headers => {"Content-Type" => "application/json"}, - :body => anything, + :headers => {"Content-Type" => "application/json"}, + :body => anything, :auth => { :user => user, :password => password, @@ -146,7 +145,7 @@ :auth => { :user => user, :password => password - }, + }, :ssl => { :enabled => true, :ca_file => cacert @@ -154,14 +153,14 @@ } end it_behaves_like("an indexer", true) - + describe "with a password requiring escaping" do let(:user) { "f@ncyuser" } let(:password) { "ab%12#" } - + include_examples("an indexer", true) end - + describe "with a user/password requiring escaping in the URL" do let(:config) do { @@ -171,7 +170,7 @@ "index" => index } end - + include_examples("an indexer", true) end end diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index efb7ca7f7..f4e9c4e49 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -30,7 +30,7 @@ let(:http_hostname_port) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}") } let(:https_hostname_port) { ::LogStash::Util::SafeURI.new("https://#{hostname_port}") } let(:http_hostname_port_path) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}/path") } - + shared_examples("proper host handling") do it "should properly transform a host:port string to a URL" do expect(subject.host_to_url(hostname_port_uri).to_s).to eq(http_hostname_port.to_s + "/") @@ -59,7 +59,7 @@ context "when SSL is false" do let(:ssl) { false } let(:base_options) { super.merge(:hosts => [https_hostname_port]) } - + it "should refuse to handle an https url" do expect { subject.host_to_url(https_hostname_port) @@ -73,13 +73,13 @@ subject expect(subject.host_to_url(https_hostname_port).to_s).to eq(https_hostname_port.to_s + "/") end - end + end end describe "path" do let(:url) { http_hostname_port_path } let(:base_options) { super.merge(:hosts => [url]) } - + it "should allow paths in a url" do expect(subject.host_to_url(url)).to eq(url) end @@ -93,12 +93,12 @@ }.to raise_error(LogStash::ConfigurationError) end end - + context "with a path missing a leading /" do let(:url) { http_hostname_port } let(:base_options) { super.merge(:client_settings => {:path => "otherpath"}) } - - + + it "should automatically insert a / in front of path overlays" do expected = url.clone expected.path = url.path + "/otherpath" @@ -190,14 +190,13 @@ ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}], ]} - context "if a message is over TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message) { "a" * (target_bulk_bytes + 1) } + context "if a message is over http_max_content_length" do + let(:message) { "a" * (subject.http_max_content_length + 1) } it "should be handled properly" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).once do |data| - expect(data.size).to be > target_bulk_bytes + expect(data.size).to be > subject.http_max_content_length end s = subject.send(:bulk, actions) end @@ -216,9 +215,8 @@ s = subject.send(:bulk, actions) end - context "if one exceeds TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } + context "if one exceeds http_max_content_length" do + let(:message1) { "a" * (subject.http_max_content_length + 1) } it "executes two bulk_send operations" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).twice