diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index e48644308c495..fb13f03eee232 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -159,6 +159,7 @@ dnsmsg dogstatsd downcasted dsl +dsn dstaddr dstat dstport diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index 1c470d80f5297..49c46da14a555 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -112,6 +112,8 @@ on: value: ${{ jobs.int_tests.outputs.pulsar }} redis: value: ${{ jobs.int_tests.outputs.redis }} + sentry: + value: ${{ jobs.int_tests.outputs.sentry }} splunk: value: ${{ jobs.int_tests.outputs.splunk }} webhdfs: @@ -260,7 +262,7 @@ jobs: id: filter with: base: ${{ env.BASE_SHA }} - ref: ${{ env.HEAD_SHA }} + ref: ${{ env.HEAD_SHA }} filters: int_test_filters.yaml # This JSON hack was introduced because GitHub Actions does not support dynamic expressions in the @@ -338,5 +340,5 @@ jobs: id: filter with: base: ${{ env.BASE_SHA }} - ref: ${{ env.HEAD_SHA }} + ref: ${{ env.HEAD_SHA }} filters: int_test_filters.yaml diff --git a/.github/workflows/ci-integration-review.yml b/.github/workflows/ci-integration-review.yml index b8cbb2ace1165..87ff36e0cf390 100644 --- a/.github/workflows/ci-integration-review.yml +++ b/.github/workflows/ci-integration-review.yml @@ -91,7 +91,7 @@ jobs: "datadog-logs", "datadog-metrics", "datadog-traces", "dnstap", "docker-logs", "elasticsearch", "eventstoredb", "fluent", "gcp", "greptimedb", "http-client", "influxdb", "kafka", "logstash", "loki", "mongodb", "nats", "nginx", "opentelemetry", "postgres", "prometheus", "pulsar", - "redis", "splunk", "webhdfs" + "redis", "sentry", "splunk", "webhdfs" ] steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 5d46c5c1db0ab..a2d5878b5f61c 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -76,7 +76,7 @@ jobs: "datadog-logs", "datadog-metrics", "datadog-traces", "dnstap", "docker-logs", "elasticsearch", "eventstoredb", "fluent", "gcp", "greptimedb", "http-client", "influxdb", "kafka", "logstash", "loki", "mongodb", "nats", "nginx", "opentelemetry", "postgres", "prometheus", "pulsar", - "redis", "webhdfs" + "redis", "sentry", "webhdfs" ] timeout-minutes: 90 steps: diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 3fa5e58e6e7ba..cc896c1f94bba 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -244,6 +244,7 @@ jobs: redis sink sematext_logs sink sematext_metrics sink + sentry sink socket sink splunk_hec sink statsd sink diff --git a/Cargo.lock b/Cargo.lock index 8248a5f51f919..bf03f7d5eccb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -641,7 +641,7 @@ dependencies = [ "regex", "ring", "rustls-native-certs 0.7.0", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.8", "serde", "serde_json", @@ -1841,14 +1841,14 @@ dependencies = [ "pin-project-lite", "rustls 0.23.23", "rustls-native-certs 0.8.1", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", "serde_derive", "serde_json", "serde_repr", "serde_urlencoded", - "thiserror 2.0.3", + "thiserror 2.0.16", "tokio", "tokio-util", "tower-service", @@ -2452,7 +2452,7 @@ checksum = "f29222b549d4e3ded127989d523da9e928918d0d0d7f7c1690b439d0d538bae9" dependencies = [ "directories", "serde", - "thiserror 2.0.3", + "thiserror 2.0.16", "toml 0.8.23", ] @@ -3073,11 +3073,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "serde", + "uuid", +] + [[package]] name = "der" -version = "0.7.8" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid", "pem-rfc7468", @@ -4692,7 +4702,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustls-pki-types", - "thiserror 2.0.3", + "thiserror 2.0.16", "time", "tinyvec", "tracing 0.1.41", @@ -6212,7 +6222,7 @@ dependencies = [ "memchr", "serde", "simdutf8", - "thiserror 2.0.3", + "thiserror 2.0.16", ] [[package]] @@ -8265,7 +8275,7 @@ dependencies = [ "newtype-uuid", "quick-xml 0.37.4", "strip-ansi-escapes", - "thiserror 2.0.3", + "thiserror 2.0.16", "uuid", ] @@ -8324,7 +8334,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.23", "socket2 0.5.10", - "thiserror 2.0.3", + "thiserror 2.0.16", "tokio", "tracing 0.1.41", ] @@ -8343,7 +8353,7 @@ dependencies = [ "rustls 0.23.23", "rustls-pki-types", "slab", - "thiserror 2.0.3", + "thiserror 2.0.16", "tinyvec", "tracing 0.1.41", "web-time", @@ -8704,7 +8714,7 @@ checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" dependencies = [ "getrandom 0.2.15", "libredox", - "thiserror 2.0.3", + "thiserror 2.0.16", ] [[package]] @@ -8889,7 +8899,7 @@ dependencies = [ "quinn", "rustls 0.23.23", "rustls-native-certs 0.8.1", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", "serde_json", @@ -9133,7 +9143,7 @@ dependencies = [ "futures-util", "log", "rustls-native-certs 0.7.0", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-webpki 0.102.8", "thiserror 1.0.68", "tokio", @@ -9289,7 +9299,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", "security-framework 2.10.0", @@ -9318,21 +9328,21 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64 0.21.7", "rustls-pki-types", ] [[package]] name = "rustls-pki-types" -version = "1.10.1" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ "web-time", + "zeroize", ] [[package]] @@ -9584,6 +9594,74 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "sentry" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989425268ab5c011e06400187eed6c298272f8ef913e49fcadc3fda788b45030" +dependencies = [ + "httpdate", + "native-tls", + "reqwest 0.12.9", + "sentry-core", + "sentry-log", + "sentry-tracing", + "tokio", + "ureq", +] + +[[package]] +name = "sentry-core" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deaa38b94e70820ff3f1f9db3c8b0aef053b667be130f618e615e0ff2492cbcc" +dependencies = [ + "rand 0.9.2", + "sentry-types", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "sentry-log" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670f08baf70058926b0fa60c8f10218524ef0cb1a1634b0388a4123bdec6288c" +dependencies = [ + "log", + "sentry-core", +] + +[[package]] +name = "sentry-tracing" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fac841c7050aa73fc2bec8f7d8e9cb1159af0b3095757b99820823f3e54e5080" +dependencies = [ + "bitflags 2.9.0", + "sentry-core", + "tracing-core 0.1.33", + "tracing-subscriber", +] + +[[package]] +name = "sentry-types" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e477f4d4db08ddb4ab553717a8d3a511bc9e81dde0c808c680feacbb8105c412" +dependencies = [ + "debugid", + "hex", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror 2.0.16", + "time", + "url", + "uuid", +] + [[package]] name = "serde" version = "1.0.219" @@ -10180,7 +10258,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror 2.0.3", + "thiserror 2.0.16", "tokio", "tokio-stream", "tracing 0.1.41", @@ -10263,7 +10341,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.3", + "thiserror 2.0.16", "tracing 0.1.41", "whoami", ] @@ -10301,7 +10379,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.3", + "thiserror 2.0.16", "tracing 0.1.41", "whoami", ] @@ -10326,7 +10404,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror 2.0.3", + "thiserror 2.0.16", "tracing 0.1.41", "url", ] @@ -10624,7 +10702,7 @@ checksum = "495b0abdce3dc1f8fd27240651c9e68890c14e9d9c61527b1ce44d8a5a7bd3d5" dependencies = [ "cfg-if", "native-tls", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", ] [[package]] @@ -10715,11 +10793,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.3" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" dependencies = [ - "thiserror-impl 2.0.3", + "thiserror-impl 2.0.16", ] [[package]] @@ -10735,9 +10813,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.3" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" dependencies = [ "proc-macro2 1.0.101", "quote 1.0.40", @@ -11181,7 +11259,7 @@ dependencies = [ "pin-project", "prost 0.12.6", "rustls-native-certs 0.7.0", - "rustls-pemfile 2.1.0", + "rustls-pemfile 2.2.0", "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", @@ -11887,6 +11965,36 @@ dependencies = [ "typenum", ] +[[package]] +name = "ureq" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00432f493971db5d8e47a65aeb3b02f8226b9b11f1450ff86bb772776ebadd70" +dependencies = [ + "base64 0.22.1", + "der", + "log", + "native-tls", + "percent-encoding", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "ureq-proto", + "utf-8", + "webpki-root-certs", +] + +[[package]] +name = "ureq-proto" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbe120bb823a0061680e66e9075942fcdba06d46551548c2c259766b9558bc9a" +dependencies = [ + "base64 0.22.1", + "http 1.3.1", + "httparse", + "log", +] + [[package]] name = "url" version = "2.5.4" @@ -12159,6 +12267,7 @@ dependencies = [ "rumqttc", "seahash", "semver 1.0.26", + "sentry", "serde", "serde-toml-merge", "serde_bytes", @@ -12664,7 +12773,7 @@ dependencies = [ "strip-ansi-escapes", "syslog_loose 0.22.0", "termcolor", - "thiserror 2.0.3", + "thiserror 2.0.16", "tokio", "tracing 0.1.41", "ua-parser", @@ -12917,6 +13026,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "webpki-root-certs" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4ffd8df1c57e87c325000a3d6ef93db75279dc3a231125aac571650f22b12a" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index 2ec3bff5fb8cc..dbe11bb89dd2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -335,6 +335,9 @@ hex = { version = "0.4.3", default-features = false, optional = true } # GreptimeDB greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingester-rust", rev = "f7243393808640f5123b0d5b7b798da591a4df6e", optional = true } +# Sentry +sentry = { version = "0.42.0", default-features = false, optional = true, features = ["reqwest", "transport", "logs"] } + # External libs arc-swap = { version = "1.7", default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } @@ -795,6 +798,7 @@ sinks-logs = [ "sinks-pulsar", "sinks-redis", "sinks-sematext", + "sinks-sentry", "sinks-socket", "sinks-splunk_hec", "sinks-vector", @@ -865,6 +869,7 @@ sinks-postgres = ["dep:sqlx"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] +sinks-sentry = ["dep:sentry"] sinks-socket = ["sinks-utils-udp"] sinks-splunk_hec = [] sinks-statsd = ["sinks-utils-udp", "tokio-util/net"] @@ -917,6 +922,7 @@ all-integration-tests = [ "redis-integration-tests", "splunk-integration-tests", "webhdfs-integration-tests", + "sentry-integration-tests", ] amqp-integration-tests = ["sources-amqp", "sinks-amqp"] @@ -983,6 +989,7 @@ redis-integration-tests = ["sinks-redis", "sources-redis"] splunk-integration-tests = ["sinks-splunk_hec"] dnstap-integration-tests = ["sources-dnstap", "dep:bollard"] webhdfs-integration-tests = ["sinks-webhdfs"] +sentry-integration-tests = ["sinks-sentry"] disable-resolv-conf = [] shutdown-tests = ["api", "sinks-blackhole", "sinks-console", "sinks-prometheus", "sources", "transforms-lua", "transforms-remap", "unix"] cli-tests = ["sinks-blackhole", "sinks-socket", "sources-demo_logs", "sources-file"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 36943f20cb2e0..71c08d0efae4d 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -185,6 +185,7 @@ data-url,https://github.com/servo/rust-url,MIT OR Apache-2.0,Simon Sapin dbl,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers deadpool,https://github.com/bikeshedder/deadpool,MIT OR Apache-2.0,Michael P. Jung +debugid,https://github.com/getsentry/rust-debugid,Apache-2.0,Sentry der,https://github.com/RustCrypto/formats/tree/master/der,Apache-2.0 OR MIT,RustCrypto Developers deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt derivative,https://github.com/mcarton/rust-derivative,MIT OR Apache-2.0,mcarton @@ -587,6 +588,11 @@ security-framework,https://github.com/kornelski/rust-security-framework,MIT OR A semver,https://github.com/dtolnay/semver,MIT OR Apache-2.0,David Tolnay semver,https://github.com/steveklabnik/semver,MIT OR Apache-2.0,"Steve Klabnik , The Rust Project Developers" semver-parser,https://github.com/steveklabnik/semver-parser,MIT OR Apache-2.0,Steve Klabnik +sentry,https://github.com/getsentry/sentry-rust,MIT,Sentry +sentry-core,https://github.com/getsentry/sentry-rust,MIT,Sentry +sentry-log,https://github.com/getsentry/sentry-rust,MIT,Sentry +sentry-tracing,https://github.com/getsentry/sentry-rust,MIT,Sentry +sentry-types,https://github.com/getsentry/sentry-rust,MIT,Sentry serde,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde-toml-merge,https://github.com/jdrouet/serde-toml-merge,MIT,Jeremie Drouet serde-value,https://github.com/arcnmx/serde-value,MIT,arcnmx @@ -716,6 +722,8 @@ unreachable,https://github.com/reem/rust-unreachable,MIT OR Apache-2.0,Jonatha unsafe-libyaml,https://github.com/dtolnay/unsafe-libyaml,MIT,David Tolnay untrusted,https://github.com/briansmith/untrusted,ISC,Brian Smith uom,https://github.com/iliekturtles/uom,Apache-2.0 OR MIT,Mike Boutin +ureq,https://github.com/algesten/ureq,MIT OR Apache-2.0,"Martin Algesten , Jacob Hoffman-Andrews " +ureq-proto,https://github.com/algesten/ureq-proto,MIT OR Apache-2.0,Martin Algesten url,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers urlencoding,https://github.com/kornelski/rust_urlencoding,MIT,"Kornel , Bertram Truong " utf-8,https://github.com/SimonSapin/rust-utf8,MIT OR Apache-2.0,Simon Sapin @@ -748,6 +756,7 @@ wasm-timer,https://github.com/tomaka/wasm-timer,MIT,Pierre Krieger diff --git a/Makefile b/Makefile index ffd4b191f7008..090ea11050248 100644 --- a/Makefile +++ b/Makefile @@ -374,7 +374,7 @@ test-integration: test-integration-databend test-integration-docker-logs test-in test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar -test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs test-integration-e2e-datadog-logs test-integration-e2e-opentelemetry-logs +test-integration: test-integration-redis test-integration-sentry test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs test-integration-e2e-datadog-logs test-integration-e2e-opentelemetry-logs test-integration: test-integration-datadog-traces test-integration-shutdown test-integration-%-cleanup: diff --git a/changelog.d/add_sentry_sink.feature.md b/changelog.d/add_sentry_sink.feature.md new file mode 100644 index 0000000000000..56759edc5ed27 --- /dev/null +++ b/changelog.d/add_sentry_sink.feature.md @@ -0,0 +1,3 @@ +Introduced a new `sentry` sink for sending log events to [Sentry](https://sentry.io) for Sentry's [structured logging product](https://docs.sentry.io/product/explore/logs/). + +authors: AbhiPrasad diff --git a/scripts/integration/sentry/compose.yaml b/scripts/integration/sentry/compose.yaml new file mode 100644 index 0000000000000..5b5a1c56d7e61 --- /dev/null +++ b/scripts/integration/sentry/compose.yaml @@ -0,0 +1,14 @@ +version: "3" + +services: + sentry-mock: + image: docker.io/wiremock/wiremock:${CONFIG_VERSION} + command: + - --global-response-templating + - --port + - "8080" + - --verbose + ports: + - "8080:8080" + volumes: + - ./wiremock-mappings:/home/wiremock/mappings:ro diff --git a/scripts/integration/sentry/test.yaml b/scripts/integration/sentry/test.yaml new file mode 100644 index 0000000000000..6f0947b13ce20 --- /dev/null +++ b/scripts/integration/sentry/test.yaml @@ -0,0 +1,18 @@ +features: + - sentry-integration-tests + +test_filter: "::sentry::" + +env: + SENTRY_MOCK_ENDPOINT: http://sentry-mock:8080 + +matrix: + version: [latest] + +# changes to these files/paths will invoke the integration test in CI +# expressions are evaluated using https://github.com/micromatch/picomatch +paths: + - "src/internal_events/sentry.rs" + - "src/sinks/sentry/**" + - "src/sinks/util/**" + - "scripts/integration/sentry/**" diff --git a/scripts/integration/sentry/wiremock-mappings/sentry-envelope.json b/scripts/integration/sentry/wiremock-mappings/sentry-envelope.json new file mode 100644 index 0000000000000..a8e8940822763 --- /dev/null +++ b/scripts/integration/sentry/wiremock-mappings/sentry-envelope.json @@ -0,0 +1,38 @@ +{ + "request": { + "method": "POST", + "urlPathPattern": "/api/[0-9]+/envelope/", + "headers": { + "Content-Type": { + "equalTo": "application/x-sentry-envelope" + }, + "X-Sentry-Auth": { + "matches": "Sentry sentry_version=7, sentry_key=.*" + }, + "User-Agent": { + "equalTo": "sentry.vector/0.1.0" + } + } + }, + "response": { + "status": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": "{\"id\": \"{{randomValue type='UUID'}}\"}", + "transformers": ["response-template"] + }, + "postServeActions": [ + { + "name": "webhook", + "parameters": { + "method": "POST", + "url": "http://localhost:8081/envelope-received", + "headers": { + "Content-Type": "application/json" + }, + "body": "{{jsonPath request.body '$'}}" + } + } + ] +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 7a45737149708..fc0728d48a630 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -121,6 +121,8 @@ mod remap; mod sample; #[cfg(feature = "sinks-sematext")] mod sematext_metrics; +#[cfg(feature = "sinks-sentry")] +mod sentry; mod socket; #[cfg(any(feature = "sources-splunk_hec", feature = "sinks-splunk_hec"))] mod splunk_hec; @@ -273,6 +275,8 @@ pub(crate) use self::remap::*; pub(crate) use self::sample::*; #[cfg(feature = "sinks-sematext")] pub(crate) use self::sematext_metrics::*; +#[cfg(feature = "sinks-sentry")] +pub(crate) use self::sentry::*; #[cfg(any(feature = "sources-splunk_hec", feature = "sinks-splunk_hec"))] pub(crate) use self::splunk_hec::*; #[cfg(feature = "sinks-statsd")] diff --git a/src/internal_events/sentry.rs b/src/internal_events/sentry.rs new file mode 100644 index 0000000000000..dd78471abb98e --- /dev/null +++ b/src/internal_events/sentry.rs @@ -0,0 +1,148 @@ +use metrics::counter; +use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL, error_stage, error_type}; + +/// Emitted when a log event is successfully converted to a Sentry log and encoded into an envelope. +#[derive(Debug)] +pub struct SentryEventEncoded { + pub byte_size: usize, + pub log_count: usize, +} + +impl InternalEvent for SentryEventEncoded { + fn emit(self) { + trace!( + message = "Events encoded for Sentry.", + byte_size = %self.byte_size, + log_count = %self.log_count, + ); + counter!("component_sent_events_total").increment(self.log_count as u64); + counter!("component_sent_event_bytes_total").increment(self.byte_size as u64); + } +} + +/// Emitted when there's an error encoding a log event as a Sentry envelope. +#[derive(Debug)] +pub struct SentryEncodingError { + pub error: std::io::Error, +} + +impl InternalEvent for SentryEncodingError { + fn emit(self) { + let reason = "Failed to encode Sentry envelope."; + error!( + message = reason, + error = %self.error, + error_code = "encoding_failed", + error_type = error_type::ENCODER_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "encoding_failed", + "error_type" => error_type::ENCODER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + + emit!(ComponentEventsDropped:: { count: 1, reason }); + } +} + +/// Emitted when the DSN parsing fails during configuration or healthcheck. +#[derive(Debug)] +pub struct SentryInvalidDsnError { + pub error: E, + pub dsn: String, +} + +impl InternalEvent for SentryInvalidDsnError { + fn emit(self) { + error!( + message = "Invalid Sentry DSN provided.", + error = %self.error, + dsn = %self.dsn, + error_code = "invalid_dsn", + error_type = error_type::CONFIGURATION_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "invalid_dsn", + "error_type" => error_type::CONFIGURATION_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } +} + +/// Emitted when a non-log event is dropped because Sentry sink only supports log events. +#[derive(Debug)] +pub struct SentryEventTypeError { + pub event_type: String, +} + +impl InternalEvent for SentryEventTypeError { + fn emit(self) { + let reason = "Event type not supported by Sentry sink."; + debug!( + message = reason, + event_type = %self.event_type, + error_code = "unsupported_event_type", + error_type = error_type::CONVERSION_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "unsupported_event_type", + "error_type" => error_type::CONVERSION_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + + emit!(ComponentEventsDropped:: { count: 1, reason }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sentry_event_encoded_internal_event() { + // Just ensure the event can be created and emitted without panicking + let event = SentryEventEncoded { + byte_size: 1024, + log_count: 5, + }; + event.emit(); + } + + #[test] + fn test_sentry_encoding_error_internal_event() { + let event = SentryEncodingError { + error: std::io::Error::new(std::io::ErrorKind::InvalidData, "test error"), + }; + event.emit(); + } + + #[test] + fn test_sentry_invalid_dsn_error_internal_event() { + let event = SentryInvalidDsnError { + error: "Invalid format", + dsn: "invalid-dsn".to_string(), + }; + event.emit(); + } + + #[test] + fn test_sentry_event_type_error_internal_event() { + let event = SentryEventTypeError { + event_type: "metric".to_string(), + }; + event.emit(); + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b5a45a462566e..9dc96c77ff094 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -100,6 +100,8 @@ pub mod redis; pub mod s3_common; #[cfg(feature = "sinks-sematext")] pub mod sematext; +#[cfg(feature = "sinks-sentry")] +pub mod sentry; #[cfg(feature = "sinks-socket")] pub mod socket; #[cfg(feature = "sinks-splunk_hec")] diff --git a/src/sinks/sentry/config.rs b/src/sinks/sentry/config.rs new file mode 100644 index 0000000000000..040e115929693 --- /dev/null +++ b/src/sinks/sentry/config.rs @@ -0,0 +1,233 @@ +//! Configuration for the `sentry` sink. + +use futures::FutureExt; +use sentry::{IntoDsn, types::Dsn}; +use vector_lib::configurable::configurable_component; + +use crate::{ + codecs::EncodingConfig, + http::HttpClient, + internal_events::SentryInvalidDsnError, + sinks::{ + prelude::*, + util::{ + BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt, + http::{HttpService, RequestConfig, http_response_retry_logic}, + }, + }, +}; + +use super::encoder::SentryEncoder; +use super::request_builder::SentryRequestBuilder; +use super::service::SentryServiceRequestBuilder; +use super::sink::SentrySink; + +/// Configuration for the Sentry sink. +#[configurable_component(sink("sentry"))] +#[derive(Clone, Debug)] +pub struct SentryConfig { + /// Sentry Data Source Name (DSN). + /// + /// The DSN tells the SDK where to send events so they are associated with the correct project. + /// Format: {PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID} + #[configurable(metadata( + docs::examples = "https://abcdef1234567890@o123456.ingest.sentry.io/9876543" + ))] + pub dsn: String, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: RequestConfig, + + #[configurable(derived)] + pub tls: Option, + + #[configurable(derived)] + pub encoding: EncodingConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for SentryConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"dsn = "https://your-public-key@your-sentry-host/your-project-id" +encoding.codec = "json""#, + ) + .unwrap() + } +} + +#[async_trait] +#[typetag::serde(name = "sentry")] +impl SinkConfig for SentryConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let tls = TlsSettings::from_options(self.tls.as_ref())?; + let client = HttpClient::new(tls, cx.proxy())?; + + let dsn: Dsn = self + .dsn + .clone() + .into_dsn() + .map_err(|e| { + emit!(SentryInvalidDsnError { + error: format!("{:?}", e), + dsn: self.dsn.clone(), + }); + format!("Invalid DSN: {:?}", e) + })? + .ok_or_else(|| { + emit!(SentryInvalidDsnError { + error: "Failed to parse DSN".to_string(), + dsn: self.dsn.clone(), + }); + "Failed to parse DSN" + })?; + + let transformer = self.encoding.transformer(); + let encoder = SentryEncoder::new(transformer); + let request_builder = SentryRequestBuilder::new(encoder); + let sentry_service_request_builder = SentryServiceRequestBuilder::new(dsn); + + let service = HttpService::new(client, sentry_service_request_builder); + + let request_limits = self.request.tower.into_settings(); + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = SentrySink::new(service, batch_settings, request_builder)?; + + // Healthcheck validates DSN format + let healthcheck = healthcheck_dsn(self.dsn.clone()).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck_dsn(dsn_str: String) -> crate::Result<()> { + let _dsn: Dsn = dsn_str + .clone() + .into_dsn() + .map_err(|e| { + emit!(SentryInvalidDsnError { + error: format!("{:?}", e), + dsn: dsn_str.clone(), + }); + format!("Invalid DSN: {:?}", e) + })? + .ok_or_else(|| { + emit!(SentryInvalidDsnError { + error: "Failed to parse DSN".to_string(), + dsn: dsn_str.clone(), + }); + "Failed to parse DSN".to_string() + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn parse_config() { + let cfg = toml::from_str::( + r#" + dsn = "https://abcdef1234567890@o123456.ingest.sentry.io/9876543" + encoding.codec = "json" + "#, + ) + .unwrap(); + + assert_eq!( + cfg.dsn, + "https://abcdef1234567890@o123456.ingest.sentry.io/9876543" + ); + } + + #[test] + fn parse_config_with_batch_settings() { + let cfg = toml::from_str::( + r#" + dsn = "https://key@host/project" + encoding.codec = "json" + batch.max_events = 100 + batch.timeout_secs = 5 + "#, + ) + .unwrap(); + + assert_eq!(cfg.dsn, "https://key@host/project"); + assert_eq!(cfg.batch.max_events, Some(100)); + assert_eq!(cfg.batch.timeout_secs, Some(5.0)); + } + + #[test] + fn parse_config_invalid_dsn_format() { + let result = toml::from_str::( + r#" + dsn = "invalid-dsn-format" + encoding.codec = "json" + "#, + ); + + // This should parse successfully at the config level, + // DSN validation happens at build time + assert!(result.is_ok()); + } + + #[tokio::test] + async fn healthcheck_dsn_valid() { + let result = healthcheck_dsn( + "https://abcdef1234567890@o123456.ingest.sentry.io/9876543".to_string(), + ) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn healthcheck_dsn_invalid() { + let result = healthcheck_dsn("invalid-dsn".to_string()).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid DSN")); + } + + #[tokio::test] + async fn healthcheck_dsn_empty() { + let result = healthcheck_dsn("".to_string()).await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Failed to parse DSN") + ); + } +} diff --git a/src/sinks/sentry/constants.rs b/src/sinks/sentry/constants.rs new file mode 100644 index 0000000000000..5bee7d1d53c4a --- /dev/null +++ b/src/sinks/sentry/constants.rs @@ -0,0 +1,11 @@ +pub const SDK_NAME: &str = "sentry.vector"; +pub const SDK_VERSION: &str = "0.1.0"; + +// HTTP headers and content types +pub const CONTENT_TYPE_SENTRY_ENVELOPE: &str = "application/x-sentry-envelope"; +pub const AUTH_HEADER_NAME: &str = "X-Sentry-Auth"; +pub const USER_AGENT: &str = "sentry.vector/0.1.0"; + +// Sentry protocol constants +pub const SENTRY_VERSION: u8 = 7; +pub const SENTRY_CLIENT: &str = "sentry.vector/0.1.0"; diff --git a/src/sinks/sentry/encoder.rs b/src/sinks/sentry/encoder.rs new file mode 100644 index 0000000000000..af7c3a0de1228 --- /dev/null +++ b/src/sinks/sentry/encoder.rs @@ -0,0 +1,210 @@ +//! Encoding for the `sentry` sink. + +use sentry::Envelope; +use sentry::protocol::{EnvelopeItem, ItemContainer}; +use std::io; + +use crate::{ + event::Event, + internal_events::{SentryEncodingError, SentryEventEncoded, SentryEventTypeError}, + sinks::{ + prelude::*, + util::encoding::{Encoder, write_all}, + }, +}; +use vector_lib::config::telemetry; + +use super::log_convert::convert_to_sentry_log; + +#[derive(Clone)] +pub(super) struct SentryEncoder { + pub(super) transformer: Transformer, +} + +impl SentryEncoder { + pub(super) const fn new(transformer: Transformer) -> Self { + Self { transformer } + } +} + +// Implement the encoder trait for our Sentry encoder +impl Encoder> for SentryEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn std::io::Write, + ) -> std::io::Result<(usize, vector_lib::request_metadata::GroupedCountByteSize)> { + let mut sentry_logs = Vec::new(); + let mut byte_size = telemetry().create_request_count_byte_size(); + + for mut event in events { + self.transformer.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + match event { + Event::Log(log_event) => { + sentry_logs.push(convert_to_sentry_log(&log_event)); + } + Event::Metric(_) => { + emit!(SentryEventTypeError { + event_type: "metric".to_string(), + }); + } + Event::Trace(_) => { + emit!(SentryEventTypeError { + event_type: "trace".to_string(), + }); + } + } + } + + if sentry_logs.is_empty() { + return Ok(( + 0, + vector_lib::request_metadata::GroupedCountByteSize::default(), + )); + } + + let num_logs = sentry_logs.len(); + + // Create envelope with ItemContainer + let item_container = ItemContainer::Logs(sentry_logs); + let envelope_item = EnvelopeItem::ItemContainer(item_container); + + // Create envelope with the item + let mut envelope = Envelope::new(); + envelope.add_item(envelope_item); + + // Serialize the envelope to bytes and write + let mut envelope_bytes = Vec::new(); + envelope.to_writer(&mut envelope_bytes).map_err(|e| { + let io_error = io::Error::new(io::ErrorKind::InvalidData, e); + emit!(SentryEncodingError { + error: io::Error::new( + io::ErrorKind::InvalidData, + "Failed to serialize Sentry envelope" + ), + }); + io_error + })?; + + if let Err(e) = write_all(writer, num_logs, &envelope_bytes) { + emit!(SentryEncodingError { + error: io::Error::new(e.kind(), format!("Failed to write envelope: {}", e)) + }); + return Err(e); + } + + // Emit success event + emit!(SentryEventEncoded { + byte_size: envelope_bytes.len(), + log_count: num_logs, + }); + + Ok((envelope_bytes.len(), byte_size)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::codecs::Transformer; + use vector_lib::event::{Event, LogEvent, Metric, MetricKind, MetricValue}; + + fn create_test_encoder() -> SentryEncoder { + SentryEncoder::new(Transformer::default()) + } + + fn create_test_log_event() -> Event { + let mut log = LogEvent::from("test message"); + log.insert("level", "info"); + log.insert("trace_id", "12345678-1234-1234-1234-123456789012"); + log.insert("custom_field", "custom_value"); + Event::Log(log) + } + + #[test] + fn test_encode_single_log_event() { + let encoder = create_test_encoder(); + let event = create_test_log_event(); + let mut writer = Vec::new(); + + let result = encoder.encode_input(vec![event], &mut writer); + + assert!(result.is_ok()); + let (bytes_written, byte_size) = result.unwrap(); + assert!(bytes_written > 0); + assert!(!writer.is_empty()); + assert!(byte_size.size().unwrap().0 > 0); + } + + #[test] + fn test_encode_multiple_log_events() { + let encoder = create_test_encoder(); + let events = vec![ + create_test_log_event(), + create_test_log_event(), + create_test_log_event(), + ]; + let mut writer = Vec::new(); + + let result = encoder.encode_input(events, &mut writer); + + assert!(result.is_ok()); + let (bytes_written, byte_size) = result.unwrap(); + assert!(bytes_written > 0); + assert!(!writer.is_empty()); + assert!(byte_size.size().unwrap().0 > 0); + } + + #[test] + fn test_encode_empty_events() { + let encoder = create_test_encoder(); + let events = vec![]; + let mut writer = Vec::new(); + + let result = encoder.encode_input(events, &mut writer); + + assert!(result.is_ok()); + let (bytes_written, byte_size) = result.unwrap(); + assert_eq!(bytes_written, 0); + assert!(writer.is_empty()); + assert_eq!(byte_size.size().unwrap().0, 0); + } + + #[test] + fn test_encode_non_log_events() { + let encoder = create_test_encoder(); + let metric = Event::Metric(Metric::new( + "test_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 1.0 }, + )); + let mut writer = Vec::new(); + + let result = encoder.encode_input(vec![metric], &mut writer); + + // Should succeed but write nothing as metrics are skipped + assert!(result.is_ok()); + let (bytes_written, byte_size) = result.unwrap(); + assert_eq!(bytes_written, 0); + assert!(writer.is_empty()); + // Since no log events were processed, the encoder returns a default GroupedCountByteSize + assert_eq!(byte_size.size().unwrap().0, 0); + } + + #[test] + fn test_encode_log_with_transformer() { + let transformer = Transformer::default(); + let encoder = SentryEncoder::new(transformer); + let event = create_test_log_event(); + let mut writer = Vec::new(); + + let result = encoder.encode_input(vec![event], &mut writer); + + assert!(result.is_ok()); + let (bytes_written, byte_size) = result.unwrap(); + assert!(bytes_written > 0); + assert!(byte_size.size().unwrap().0 > 0); + } +} diff --git a/src/sinks/sentry/integration_tests.rs b/src/sinks/sentry/integration_tests.rs new file mode 100644 index 0000000000000..680eab52b3dfb --- /dev/null +++ b/src/sinks/sentry/integration_tests.rs @@ -0,0 +1,225 @@ +#![cfg(test)] + +use std::time::Duration; + +use futures::stream; +use sentry::{Envelope, protocol::EnvelopeItem}; +use tokio::time::timeout; +use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{header, method, path_regex}, +}; + +use super::config::SentryConfig; +use vector_lib::codecs::encoding::format::JsonSerializerOptions; +use vector_lib::codecs::{JsonSerializerConfig, MetricTagValues}; + +use crate::{ + codecs::{EncodingConfig, Transformer}, + config::{AcknowledgementsConfig, SinkConfig, SinkContext}, + event::{Event, LogEvent}, + sinks::util::BatchConfig, + sinks::util::http::RequestConfig, + test_util::{ + components::{SINK_TAGS, run_and_assert_sink_compliance}, + trace_init, + }, +}; + +async fn sentry_mock_server() -> MockServer { + let mock_server = MockServer::start().await; + + // Mock the Sentry envelope endpoint + Mock::given(method("POST")) + .and(path_regex(r"/api/[0-9]+/envelope/")) + .and(header("Content-Type", "application/x-sentry-envelope")) + .and(header("User-Agent", "sentry.vector/0.1.0")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "id": "test-event-id" + }))) + .mount(&mock_server) + .await; + + mock_server +} + +#[tokio::test] +async fn sentry_sink_handles_log_events() { + trace_init(); + + let mock_server = sentry_mock_server().await; + let dsn = format!( + "http://test-key@{}/123", + mock_server.uri().replace("http://", "") + ); + + let config = SentryConfig { + dsn: dsn.clone(), + batch: BatchConfig::default(), + request: RequestConfig::default(), + tls: None, + encoding: EncodingConfig::new( + JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default()) + .into(), + Transformer::default(), + ), + acknowledgements: AcknowledgementsConfig::default(), + }; + + let cx = SinkContext::default(); + let (sink, _) = config.build(cx).await.unwrap(); + + let mut log_event = LogEvent::from("test message"); + log_event.insert("level", "info"); + log_event.insert("logger", "vector.test"); + + let events = vec![Event::Log(log_event)]; + let events = stream::iter(events); + + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + // Verify the mock server received the request + // Give some time for the request to be processed + timeout(Duration::from_secs(5), async { + loop { + let requests = mock_server.received_requests().await.unwrap(); + if !requests.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .expect("Should have received at least one request"); + + let requests = mock_server.received_requests().await.unwrap(); + assert!( + !requests.is_empty(), + "Should have received at least one request" + ); + + // Verify the request contains a properly formatted Sentry envelope + let request = &requests[0]; + assert_eq!(request.method, "POST"); + // Check that the path ends with /envelope/ for the project ID (Sentry API format) + let path = request.url.path(); + assert!( + path.contains("/envelope") || path.matches(r"/api/[0-9]+/envelope/").any(|_| true), + "Expected path to contain '/envelope' but got: {}", + path + ); + + // Verify headers + assert_eq!( + request + .headers + .get("Content-Type") + .unwrap() + .to_str() + .unwrap(), + "application/x-sentry-envelope" + ); + assert_eq!( + request.headers.get("User-Agent").unwrap().to_str().unwrap(), + "sentry.vector/0.1.0" + ); + + // Parse and verify the envelope structure + let body = &request.body; + assert!(!body.is_empty(), "Request body should not be empty"); + + // The envelope should be parseable by the Sentry library + let envelope = Envelope::from_slice(body); + assert!(envelope.is_ok(), "Should be able to parse Sentry envelope"); + + let envelope = envelope.unwrap(); + let mut items_count = 0; + for item in envelope.items() { + items_count += 1; + // Log events can be converted to different Sentry item types + match item { + EnvelopeItem::Event(_) => { + // This is a Sentry event (expected for errors/exceptions) + } + EnvelopeItem::Transaction(_) => { + // This might be a transaction if the log has performance data + } + _ => { + // Accept other types as well, since log events might be encoded differently + } + } + } + assert!(items_count > 0, "Envelope should contain items"); +} + +#[tokio::test] +async fn sentry_sink_multiple_events() { + trace_init(); + + let mock_server = sentry_mock_server().await; + let dsn = format!( + "http://test-key@{}/123", + mock_server.uri().replace("http://", "") + ); + + let config = SentryConfig { + dsn: dsn.clone(), + batch: BatchConfig::default(), + request: RequestConfig::default(), + tls: None, + encoding: EncodingConfig::new( + JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default()) + .into(), + Transformer::default(), + ), + acknowledgements: AcknowledgementsConfig::default(), + }; + + let cx = SinkContext::default(); + let (sink, _) = config.build(cx).await.unwrap(); + + // Create multiple log events + let mut events = Vec::new(); + for i in 0..3 { + let mut log_event = LogEvent::from(format!("test message {}", i)); + log_event.insert("level", "error"); + log_event.insert("logger", "vector.test"); + log_event.insert("custom_field", format!("value_{}", i)); + events.push(Event::Log(log_event)); + } + + let events = stream::iter(events); + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + // Wait for requests to be processed + timeout(Duration::from_secs(5), async { + loop { + let requests = mock_server.received_requests().await.unwrap(); + if !requests.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .expect("Should have received at least one request"); + + let requests = mock_server.received_requests().await.unwrap(); + assert!( + !requests.is_empty(), + "Should have received at least one request" + ); + + // Since we're batching, we might get multiple requests or a single request with multiple events + // Let's verify that we can parse all the envelopes + for request in &requests { + let envelope = Envelope::from_slice(&request.body); + assert!(envelope.is_ok(), "Should be able to parse Sentry envelope"); + let envelope = envelope.unwrap(); + let mut items_count = 0; + for _item in envelope.items() { + items_count += 1; + } + assert!(items_count > 0, "Envelope should contain items"); + } +} diff --git a/src/sinks/sentry/log_convert.rs b/src/sinks/sentry/log_convert.rs new file mode 100644 index 0000000000000..d92adf9ad08fb --- /dev/null +++ b/src/sinks/sentry/log_convert.rs @@ -0,0 +1,427 @@ +//! Vector log event to Sentry log conversion utilities. + +use sentry::protocol::{Log, LogAttribute, LogLevel, Map, TraceId, Value}; +use std::time::SystemTime; + +use super::constants::{SDK_NAME, SDK_VERSION}; + +/// Extract trace ID from log event, returning the trace ID and which field was used. +pub fn extract_trace_id(log: &vector_lib::event::LogEvent) -> (TraceId, Option<&'static str>) { + let trace_fields = ["trace_id", "sentry.trace_id"]; + for field_name in &trace_fields { + if let Some(trace_value) = log.get(*field_name) { + let trace_str = trace_value.to_string_lossy(); + if let Ok(uuid) = uuid::Uuid::parse_str(&trace_str) { + // Convert UUID to bytes and then to TraceId + return (TraceId::from(uuid.into_bytes()), Some(*field_name)); + } + } + } + + // Create a zero'd out trace ID (16 bytes of zeros for UUID). This is special cased + // during sentry ingestion. + let default_trace_id: TraceId = TraceId::from([0u8; 16]); + + (default_trace_id, None) +} + +/// Convert a Vector log event to a Sentry log. +pub fn convert_to_sentry_log(log: &vector_lib::event::LogEvent) -> Log { + // Extract timestamp + let timestamp = log + .get_timestamp() + .and_then(|ts| ts.as_timestamp()) + .map(|ts| (*ts).into()) + .unwrap_or_else(SystemTime::now); + + // Extract message + let body = log + .get_message() + .map(|msg| msg.to_string_lossy().into_owned()) + .unwrap_or_default(); + + // Extract level + let level = log + .get("level") + .or_else(|| log.get("severity")) + .or_else(|| log.get("sentry.level")) + .or_else(|| log.get("sentry.severity")) + .map( + |level_value| match level_value.to_string_lossy().to_lowercase().as_str() { + "trace" => LogLevel::Trace, + "debug" => LogLevel::Debug, + "info" => LogLevel::Info, + "warn" | "warning" => LogLevel::Warn, + "error" | "err" => LogLevel::Error, + "fatal" | "critical" | "alert" | "emergency" => LogLevel::Fatal, + _ => LogLevel::Info, + }, + ) + .unwrap_or(LogLevel::Info); + + // Extract trace ID and determine which field was used + let (trace_id, used_trace_field) = extract_trace_id(log); + + // Convert fields to attributes + let attributes = convert_fields_to_attributes(log, used_trace_field); + + Log { + level, + body, + trace_id: Some(trace_id), + timestamp, + severity_number: None, // We could map this from level if needed + attributes, + } +} + +/// Convert log event fields to Sentry log attributes, excluding specified fields. +/// +/// See https://develop.sentry.dev/sdk/telemetry/logs/#log-envelope-item +pub fn convert_fields_to_attributes( + log: &vector_lib::event::LogEvent, + used_trace_field: Option<&str>, +) -> Map { + let mut attributes = Map::new(); + + attributes.insert( + "sentry.sdk.name".to_string(), + LogAttribute(Value::String(SDK_NAME.to_string())), + ); + attributes.insert( + "sentry.sdk.version".to_string(), + LogAttribute(Value::String(SDK_VERSION.to_string())), + ); + + if let Some(fields) = log.all_event_fields() { + for (key, value) in fields { + let key_str = key.as_str(); + if key_str != "message" + && key_str != "level" + && key_str != "severity" + && key_str != "timestamp" + && Some(key_str) != used_trace_field + { + let sentry_value = match value { + vrl::value::Value::Bytes(b) => { + Value::String(String::from_utf8_lossy(b).to_string()) + } + vrl::value::Value::Integer(i) => Value::Number(serde_json::Number::from(*i)), + vrl::value::Value::Float(f) => { + // Ensure we're using 64-bit floating point as per Sentry protocol + let float_val = f.into_inner(); + if let Some(n) = serde_json::Number::from_f64(float_val) { + Value::Number(n) + } else { + // If the float can't be represented as a JSON number, convert to string + Value::String(float_val.to_string()) + } + } + vrl::value::Value::Boolean(b) => Value::Bool(*b), + _ => Value::String(value.to_string_lossy().to_string()), + }; + attributes.insert(key_str.to_string(), LogAttribute(sentry_value)); + } + } + } + attributes +} + +#[cfg(test)] +mod tests { + use super::*; + use sentry::protocol::{LogLevel, TraceId}; + use vector_lib::event::LogEvent; + use vrl::value::Value; + + fn create_test_log_event() -> LogEvent { + let mut log = LogEvent::from("test message"); + log.insert("level", "info"); + log.insert("custom_field", "custom_value"); + log + } + + #[test] + fn test_extract_trace_id_from_trace_id_field() { + let mut log = LogEvent::from("test"); + log.insert("trace_id", "550e8400-e29b-41d4-a716-446655440000"); + + let (trace_id, used_field) = extract_trace_id(&log); + + assert_ne!(trace_id, TraceId::from([0u8; 16])); // Not the default zero trace ID + assert_eq!(used_field, Some("trace_id")); + } + + #[test] + fn test_extract_trace_id_from_sentry_trace_id_field() { + let mut log = LogEvent::from("test"); + log.insert("sentry.trace_id", "550e8400-e29b-41d4-a716-446655440000"); + + let (trace_id, used_field) = extract_trace_id(&log); + + assert_ne!(trace_id, TraceId::from([0u8; 16])); // Not the default zero trace ID + assert_eq!(used_field, Some("sentry.trace_id")); + } + + #[test] + fn test_extract_trace_id_invalid_uuid() { + let mut log = LogEvent::from("test"); + log.insert("trace_id", "invalid-uuid-format"); + + let (trace_id, used_field) = extract_trace_id(&log); + + assert_eq!(trace_id, TraceId::from([0u8; 16])); // Should be default zero trace ID + assert_eq!(used_field, None); + } + + #[test] + fn test_extract_trace_id_missing() { + let log = LogEvent::from("test"); + + let (trace_id, used_field) = extract_trace_id(&log); + + assert_eq!(trace_id, TraceId::from([0u8; 16])); // Should be default zero trace ID + assert_eq!(used_field, None); + } + + #[test] + fn test_extract_trace_id_priority() { + let mut log = LogEvent::from("test"); + log.insert("trace_id", "550e8400-e29b-41d4-a716-446655440000"); + log.insert("sentry.trace_id", "123e4567-e89b-12d3-a456-426614174000"); + + let (trace_id, used_field) = extract_trace_id(&log); + + // Should prefer "trace_id" field first + assert_ne!(trace_id, TraceId::from([0u8; 16])); + assert_eq!(used_field, Some("trace_id")); + } + + #[test] + fn test_convert_to_sentry_log_basic() { + let log = create_test_log_event(); + + let sentry_log = convert_to_sentry_log(&log); + + assert_eq!(sentry_log.body, "test message"); + assert_eq!(sentry_log.level, LogLevel::Info); + assert!(sentry_log.trace_id.is_some()); + assert!(sentry_log.attributes.contains_key("sentry.sdk.name")); + assert!(sentry_log.attributes.contains_key("sentry.sdk.version")); + assert!(sentry_log.attributes.contains_key("custom_field")); + } + + #[test] + fn test_convert_to_sentry_log_levels() { + let test_cases = vec![ + ("trace", LogLevel::Trace), + ("debug", LogLevel::Debug), + ("info", LogLevel::Info), + ("warn", LogLevel::Warn), + ("warning", LogLevel::Warn), + ("error", LogLevel::Error), + ("err", LogLevel::Error), + ("fatal", LogLevel::Fatal), + ("critical", LogLevel::Fatal), + ("alert", LogLevel::Fatal), + ("emergency", LogLevel::Fatal), + ("unknown", LogLevel::Info), // Default fallback + ]; + + for (level_str, expected_level) in test_cases { + let mut log = LogEvent::from("test"); + log.insert("level", level_str); + + let sentry_log = convert_to_sentry_log(&log); + + assert_eq!( + sentry_log.level, expected_level, + "Failed for level: {}", + level_str + ); + } + } + + #[test] + fn test_convert_to_sentry_log_level_field_priority() { + let mut log = LogEvent::from("test"); + log.insert("level", "error"); + log.insert("severity", "warn"); + log.insert("sentry.level", "debug"); + log.insert("sentry.severity", "info"); + + let sentry_log = convert_to_sentry_log(&log); + + // Should prefer "level" field first + assert_eq!(sentry_log.level, LogLevel::Error); + } + + #[test] + fn test_convert_to_sentry_log_no_level() { + let log = LogEvent::from("test"); + + let sentry_log = convert_to_sentry_log(&log); + + assert_eq!(sentry_log.level, LogLevel::Info); // Default fallback + } + + #[test] + fn test_convert_to_sentry_log_no_message() { + let mut log = LogEvent::default(); + log.insert("level", "info"); + + let sentry_log = convert_to_sentry_log(&log); + + assert_eq!(sentry_log.body, ""); // Should be empty string + } + + #[test] + fn test_convert_to_sentry_log_with_trace_id() { + let mut log = LogEvent::from("test"); + log.insert("trace_id", "550e8400-e29b-41d4-a716-446655440000"); + + let sentry_log = convert_to_sentry_log(&log); + + assert!(sentry_log.trace_id.is_some()); + assert_ne!(sentry_log.trace_id.unwrap(), TraceId::from([0u8; 16])); + // trace_id field should be excluded from attributes + assert!(!sentry_log.attributes.contains_key("trace_id")); + } + + #[test] + fn test_convert_fields_to_attributes_basic() { + let mut log = LogEvent::from("test message"); + log.insert("custom_string", "value"); + log.insert("custom_number", 42); + log.insert("custom_bool", true); + log.insert("custom_float", 3.14); + + let attributes = convert_fields_to_attributes(&log, None); + + // Check SDK attributes are present + assert!(attributes.contains_key("sentry.sdk.name")); + assert!(attributes.contains_key("sentry.sdk.version")); + + // Check custom fields are converted + assert!(attributes.contains_key("custom_string")); + assert!(attributes.contains_key("custom_number")); + assert!(attributes.contains_key("custom_bool")); + assert!(attributes.contains_key("custom_float")); + + // Check reserved fields are excluded + assert!(!attributes.contains_key("message")); + } + + #[test] + fn test_convert_fields_to_attributes_excluded_fields() { + let mut log = LogEvent::from("test message"); + log.insert("level", "info"); + log.insert("severity", "error"); + log.insert("timestamp", "2023-01-01T00:00:00Z"); + log.insert("trace_id", "550e8400-e29b-41d4-a716-446655440000"); + log.insert("custom_field", "should_be_included"); + + let attributes = convert_fields_to_attributes(&log, Some("trace_id")); + + // Reserved fields should be excluded + assert!(!attributes.contains_key("message")); + assert!(!attributes.contains_key("level")); + assert!(!attributes.contains_key("severity")); + assert!(!attributes.contains_key("timestamp")); + assert!(!attributes.contains_key("trace_id")); + + // Custom field should be included + assert!(attributes.contains_key("custom_field")); + + // SDK attributes should always be present + assert!(attributes.contains_key("sentry.sdk.name")); + assert!(attributes.contains_key("sentry.sdk.version")); + } + + #[test] + fn test_convert_fields_to_attributes_value_types() { + let mut log = LogEvent::default(); + log.insert("string_field", "text_value"); + log.insert("int_field", 123); + log.insert("float_field", 45.67); + log.insert("bool_field", false); + log.insert("bytes_field", Value::Bytes("hello".into())); + + let attributes = convert_fields_to_attributes(&log, None); + + // Verify values are properly converted to Sentry Value types + if let Some(attr) = attributes.get("string_field") { + assert!(matches!(attr.0, sentry::protocol::Value::String(_))); + } + if let Some(attr) = attributes.get("int_field") { + assert!(matches!(attr.0, sentry::protocol::Value::Number(_))); + } + if let Some(attr) = attributes.get("float_field") { + assert!(matches!(attr.0, sentry::protocol::Value::Number(_))); + } + if let Some(attr) = attributes.get("bool_field") { + assert!(matches!(attr.0, sentry::protocol::Value::Bool(false))); + } + if let Some(attr) = attributes.get("bytes_field") { + assert!(matches!(attr.0, sentry::protocol::Value::String(_))); + } + } + + #[test] + fn test_convert_fields_to_attributes_float_edge_cases() { + let mut log = LogEvent::default(); + // Note: VRL doesn't allow NaN values to be inserted, so we test with other edge cases + log.insert("inf_float", f64::INFINITY); + log.insert("neg_inf_float", f64::NEG_INFINITY); + log.insert("large_float", f64::MAX); + log.insert("small_float", f64::MIN); + + let attributes = convert_fields_to_attributes(&log, None); + + // Infinity and other edge case float values should be handled + assert!(attributes.contains_key("inf_float")); + assert!(attributes.contains_key("neg_inf_float")); + assert!(attributes.contains_key("large_float")); + assert!(attributes.contains_key("small_float")); + } + + #[test] + fn test_convert_fields_to_attributes_sdk_values() { + let log = LogEvent::default(); + + let attributes = convert_fields_to_attributes(&log, None); + + // Check SDK name and version are set correctly + if let Some(sdk_name) = attributes.get("sentry.sdk.name") { + if let sentry::protocol::Value::String(name) = &sdk_name.0 { + assert_eq!(name, SDK_NAME); + } else { + panic!("SDK name should be a string"); + } + } else { + panic!("SDK name should be present"); + } + + if let Some(sdk_version) = attributes.get("sentry.sdk.version") { + if let sentry::protocol::Value::String(version) = &sdk_version.0 { + assert_eq!(version, SDK_VERSION); + } else { + panic!("SDK version should be a string"); + } + } else { + panic!("SDK version should be present"); + } + } + + #[test] + fn test_convert_fields_to_attributes_empty_log() { + let log = LogEvent::default(); + + let attributes = convert_fields_to_attributes(&log, None); + + // Should still have SDK attributes even for empty log + assert!(attributes.contains_key("sentry.sdk.name")); + assert!(attributes.contains_key("sentry.sdk.version")); + assert_eq!(attributes.len(), 2); // Only SDK attributes + } +} diff --git a/src/sinks/sentry/mod.rs b/src/sinks/sentry/mod.rs new file mode 100644 index 0000000000000..d33ec2a6414c9 --- /dev/null +++ b/src/sinks/sentry/mod.rs @@ -0,0 +1,16 @@ +//! The Sentry [`vector_lib::sink::VectorSink`]. +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`]s and forwarding them to Sentry via HTTP. +mod config; +mod constants; +mod encoder; +mod log_convert; +mod request_builder; +mod service; +mod sink; + +pub use self::config::SentryConfig; + +#[cfg(feature = "sentry-integration-tests")] +mod integration_tests; diff --git a/src/sinks/sentry/request_builder.rs b/src/sinks/sentry/request_builder.rs new file mode 100644 index 0000000000000..b7de4a32c9754 --- /dev/null +++ b/src/sinks/sentry/request_builder.rs @@ -0,0 +1,57 @@ +//! `RequestBuilder` implementation for the `sentry` sink. + +use bytes::Bytes; +use std::io; + +use crate::{ + event::Event, + sinks::{prelude::*, util::http::HttpRequest}, +}; + +use super::encoder::SentryEncoder; + +#[derive(Clone)] +pub(super) struct SentryRequestBuilder { + pub(super) encoder: SentryEncoder, +} + +impl SentryRequestBuilder { + pub(super) const fn new(encoder: SentryEncoder) -> Self { + Self { encoder } + } +} + +impl RequestBuilder> for SentryRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = SentryEncoder; + type Payload = Bytes; + type Request = HttpRequest<()>; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata, ()) + } +} diff --git a/src/sinks/sentry/service.rs b/src/sinks/sentry/service.rs new file mode 100644 index 0000000000000..ec6fbea0ec918 --- /dev/null +++ b/src/sinks/sentry/service.rs @@ -0,0 +1,182 @@ +//! Service implementation for the `sentry` sink. + +use bytes::Bytes; +use http::{Method, Request}; +use sentry::types::Dsn; + +use crate::sinks::util::http::{HttpRequest, HttpServiceRequestBuilder}; + +use super::constants::{ + AUTH_HEADER_NAME, CONTENT_TYPE_SENTRY_ENVELOPE, SENTRY_CLIENT, SENTRY_VERSION, USER_AGENT, +}; + +#[derive(Clone)] +pub(super) struct SentryServiceRequestBuilder { + pub(super) dsn: Dsn, +} + +impl SentryServiceRequestBuilder { + pub(super) const fn new(dsn: Dsn) -> Self { + Self { dsn } + } +} + +impl HttpServiceRequestBuilder<()> for SentryServiceRequestBuilder { + fn build(&self, mut request: HttpRequest<()>) -> Result, crate::Error> { + let payload = request.take_payload(); + + let url = self.dsn.envelope_api_url(); + + let mut req = Request::builder() + .method(Method::POST) + .uri(url.to_string()) + .header("Content-Type", CONTENT_TYPE_SENTRY_ENVELOPE) + .header("User-Agent", USER_AGENT) + .body(payload)?; + + // Add authentication header + let auth_header = format!( + "Sentry sentry_version={}, sentry_key={}, sentry_client={}", + SENTRY_VERSION, + self.dsn.public_key(), + SENTRY_CLIENT + ); + req.headers_mut() + .insert(AUTH_HEADER_NAME, http::HeaderValue::from_str(&auth_header)?); + + Ok(req) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sinks::util::http::HttpRequest; + use bytes::Bytes; + use http::HeaderValue; + use sentry::types::Dsn; + use std::str::FromStr; + use vector_lib::event::EventFinalizers; + use vector_lib::request_metadata::RequestMetadata; + + fn create_test_dsn() -> Dsn { + Dsn::from_str("https://abc123@o123456.ingest.sentry.io/123456") + .expect("Failed to create test DSN") + } + + fn create_test_request() -> HttpRequest<()> { + let payload = Bytes::from("test payload"); + let finalizers = EventFinalizers::default(); + let metadata = RequestMetadata::default(); + HttpRequest::new(payload, finalizers, metadata, ()) + } + + #[test] + fn test_sentry_svc_request_builder_new() { + let dsn = create_test_dsn(); + let builder = SentryServiceRequestBuilder::new(dsn.clone()); + + assert_eq!(builder.dsn.host(), dsn.host()); + assert_eq!(builder.dsn.public_key(), dsn.public_key()); + assert_eq!(builder.dsn.project_id(), dsn.project_id()); + } + + #[test] + fn test_build_request_success() { + let dsn = create_test_dsn(); + let builder = SentryServiceRequestBuilder::new(dsn.clone()); + let request = create_test_request(); + + let result = builder.build(request); + assert!(result.is_ok()); + + let http_request = result.unwrap(); + + // Check method + assert_eq!(http_request.method(), &Method::POST); + + // Check URI + let expected_url = dsn.envelope_api_url(); + assert_eq!(http_request.uri().to_string(), expected_url.to_string()); + + // Check Content-Type header + assert_eq!( + http_request.headers().get("Content-Type").unwrap(), + &HeaderValue::from_static(CONTENT_TYPE_SENTRY_ENVELOPE) + ); + + // Check User-Agent header + assert_eq!( + http_request.headers().get("User-Agent").unwrap(), + &HeaderValue::from_static(USER_AGENT) + ); + + // Check body + assert_eq!(http_request.body(), &Bytes::from("test payload")); + } + + #[test] + fn test_authentication_header_format() { + let dsn = create_test_dsn(); + let builder = SentryServiceRequestBuilder::new(dsn.clone()); + let request = create_test_request(); + + let result = builder.build(request); + assert!(result.is_ok()); + + let http_request = result.unwrap(); + + // Check authentication header + let auth_header = http_request.headers().get(AUTH_HEADER_NAME).unwrap(); + let auth_str = auth_header.to_str().unwrap(); + + let expected_auth = format!( + "Sentry sentry_version={}, sentry_key={}, sentry_client={}", + SENTRY_VERSION, + dsn.public_key(), + SENTRY_CLIENT + ); + + assert_eq!(auth_str, expected_auth); + assert!(auth_str.contains(&format!("sentry_version={}", SENTRY_VERSION))); + assert!(auth_str.contains(&format!("sentry_key={}", dsn.public_key()))); + assert!(auth_str.contains(&format!("sentry_client={}", SENTRY_CLIENT))); + } + + #[test] + fn test_build_with_empty_payload() { + let dsn = create_test_dsn(); + let builder = SentryServiceRequestBuilder::new(dsn); + let request = HttpRequest::new( + Bytes::new(), + EventFinalizers::default(), + RequestMetadata::default(), + (), + ); + + let result = builder.build(request); + assert!(result.is_ok()); + + let http_request = result.unwrap(); + assert_eq!(http_request.body(), &Bytes::new()); + } + + #[test] + fn test_build_with_large_payload() { + let dsn = create_test_dsn(); + let builder = SentryServiceRequestBuilder::new(dsn); + let large_payload = Bytes::from(vec![b'x'; 10000]); + let request = HttpRequest::new( + large_payload.clone(), + EventFinalizers::default(), + RequestMetadata::default(), + (), + ); + + let result = builder.build(request); + assert!(result.is_ok()); + + let http_request = result.unwrap(); + assert_eq!(http_request.body(), &large_payload); + } +} diff --git a/src/sinks/sentry/sink.rs b/src/sinks/sentry/sink.rs new file mode 100644 index 0000000000000..6ff4f3aad3a3b --- /dev/null +++ b/src/sinks/sentry/sink.rs @@ -0,0 +1,86 @@ +//! Sentry sink +//! +//! This sink sends events to Sentry using the envelope API endpoint. + +use crate::{ + event::Event, + sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, + }, +}; + +use super::request_builder::SentryRequestBuilder; + +pub(super) struct SentrySink { + service: S, + request_settings: SentryRequestBuilder, + batch_settings: BatcherSettings, +} + +impl SentrySink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + pub(super) fn new( + service: S, + batch_settings: BatcherSettings, + request_settings: SentryRequestBuilder, + ) -> Result { + Ok(Self { + service, + request_settings, + batch_settings, + }) + } +} + +#[async_trait] +impl StreamSink for SentrySink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} + +impl SentrySink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) + // Build requests with default concurrency limit. + .request_builder( + default_request_builder_concurrency_limit(), + self.request_settings, + ) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} diff --git a/website/cue/reference/components/sinks/generated/sentry.cue b/website/cue/reference/components/sinks/generated/sentry.cue new file mode 100644 index 0000000000000..7289490512880 --- /dev/null +++ b/website/cue/reference/components/sinks/generated/sentry.cue @@ -0,0 +1,711 @@ +package metadata + +generated: components: sinks: sentry: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized or compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + dsn: { + description: """ + Sentry Data Source Name (DSN). + + The DSN tells the SDK where to send events so they are associated with the correct project. + Format: {PROTOCOL}://{PUBLIC_KEY}@{HOST}/{PROJECT_ID} + """ + required: true + type: string: examples: ["https://abcdef1234567890@o123456.ingest.sentry.io/9876543"] + } + encoding: { + description: """ + Encoding configuration. + Configures how events are encoded into raw bytes. + The selected encoding also determines which input types (logs, metrics, traces) are supported. + """ + required: true + type: object: options: { + avro: { + description: "Apache Avro-specific encoder options." + relevant_when: "codec = \"avro\"" + required: true + type: object: options: schema: { + description: "The Avro schema." + required: true + type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"] + } + } + cef: { + description: "The CEF Serializer Options." + relevant_when: "codec = \"cef\"" + required: true + type: object: options: { + device_event_class_id: { + description: """ + Unique identifier for each event type. Identifies the type of event reported. + The value length must be less than or equal to 1023. + """ + required: true + type: string: {} + } + device_product: { + description: """ + Identifies the product of a vendor. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_vendor: { + description: """ + Identifies the vendor of the product. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_version: { + description: """ + Identifies the version of the problem. The combination of the device product, vendor and this value make up the unique id of the device that sends messages. + The value length must be less than or equal to 31. + """ + required: true + type: string: {} + } + extensions: { + description: """ + The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event. + The event can have any number of key-value pairs in any order. + """ + required: false + type: object: options: "*": { + description: "This is a path that points to the extension value of a log event." + required: true + type: string: {} + } + } + name: { + description: """ + This is a path that points to the human-readable description of a log event. + The value length must be less than or equal to 512. + Equals "cef.name" by default. + """ + required: true + type: string: {} + } + severity: { + description: """ + This is a path that points to the field of a log event that reflects importance of the event. + Reflects importance of the event. + + It must point to a number from 0 to 10. + 0 = lowest_importance, 10 = highest_importance. + Set to "cef.severity" by default. + """ + required: true + type: string: {} + } + version: { + description: """ + CEF Version. Can be either 0 or 1. + Set to "0" by default. + """ + required: true + type: string: enum: { + V0: "CEF specification version 0.1." + V1: "CEF specification version 1.x." + } + } + } + } + codec: { + description: "The codec to use for encoding events." + required: true + type: string: enum: { + avro: """ + Encodes an event as an [Apache Avro][apache_avro] message. + + [apache_avro]: https://avro.apache.org/ + """ + cef: "Encodes an event as a CEF (Common Event Format) formatted message." + csv: """ + Encodes an event as a CSV message. + + This codec must be configured with fields to encode. + """ + gelf: """ + Encodes an event as a [GELF][gelf] message. + + This codec is experimental for the following reason: + + The GELF specification is more strict than the actual Graylog receiver. + Vector's encoder currently adheres more strictly to the GELF spec, with + the exception that some characters such as `@` are allowed in field names. + + Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained + by Graylog and is much more relaxed than the GELF spec. + + Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means + the codec might continue to relax the enforcement of the specification. + + [gelf]: https://docs.graylog.org/docs/gelf + [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go + """ + json: """ + Encodes an event as [JSON][json]. + + [json]: https://www.json.org/ + """ + logfmt: """ + Encodes an event as a [logfmt][logfmt] message. + + [logfmt]: https://brandur.org/logfmt + """ + native: """ + Encodes an event in the [native Protocol Buffers format][vector_native_protobuf]. + + This codec is **[experimental][experimental]**. + + [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + native_json: """ + Encodes an event in the [native JSON format][vector_native_json]. + + This codec is **[experimental][experimental]**. + + [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ + raw_message: """ + No encoding. + + This encoding uses the `message` field of a log event. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + text: """ + Plain text encoding. + + This encoding uses the `message` field of a log event. For metrics, it uses an + encoding that resembles the Prometheus export format. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + } + } + csv: { + description: "The CSV Serializer Options." + relevant_when: "codec = \"csv\"" + required: true + type: object: options: { + capacity: { + description: """ + Sets the capacity (in bytes) of the internal buffer used in the CSV writer. + This defaults to 8KB. + """ + required: false + type: uint: default: 8192 + } + delimiter: { + description: "The field delimiter to use when writing CSV." + required: false + type: ascii_char: default: "," + } + double_quote: { + description: """ + Enables double quote escapes. + + This is enabled by default, but you can disable it. When disabled, quotes in + field data are escaped instead of doubled. + """ + required: false + type: bool: default: true + } + escape: { + description: """ + The escape character to use when writing CSV. + + In some variants of CSV, quotes are escaped using a special escape character + like \\ (instead of escaping quotes by doubling them). + + To use this, `double_quotes` needs to be disabled as well; otherwise, this setting is ignored. + """ + required: false + type: ascii_char: default: "\"" + } + fields: { + description: """ + Configures the fields that are encoded, as well as the order in which they + appear in the output. + + If a field is not present in the event, the output for that field is an empty string. + + Values of type `Array`, `Object`, and `Regex` are not supported, and the + output for any of these types is an empty string. + """ + required: true + type: array: items: type: string: {} + } + quote: { + description: "The quote character to use when writing CSV." + required: false + type: ascii_char: default: "\"" + } + quote_style: { + description: "The quoting style to use when writing CSV data." + required: false + type: string: { + default: "necessary" + enum: { + always: "Always puts quotes around every field." + necessary: """ + Puts quotes around fields only when necessary. + They are necessary when fields contain a quote, delimiter, or record terminator. + Quotes are also necessary when writing an empty record + (which is indistinguishable from a record with one empty field). + """ + never: "Never writes quotes, even if it produces invalid CSV data." + non_numeric: """ + Puts quotes around all fields that are non-numeric. + This means that when writing a field that does not parse as a valid float or integer, + quotes are used even if they aren't strictly necessary. + """ + } + } + } + } + } + except_fields: { + description: "List of fields that are excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + json: { + description: "Options for the JsonSerializer." + relevant_when: "codec = \"json\"" + required: false + type: object: options: pretty: { + description: "Whether to use pretty JSON formatting." + required: false + type: bool: default: false + } + } + metric_tag_values: { + description: """ + Controls how metric tag values are encoded. + + When set to `single`, only the last non-bare value of tags are displayed with the + metric. When set to `full`, all metric tags are exposed as separate assignments. + """ + relevant_when: "codec = \"json\" or codec = \"text\"" + required: false + type: string: { + default: "single" + enum: { + full: "All tags are exposed as arrays of either string or null values." + single: """ + Tag values are exposed as single strings, the same as they were before this config + option. Tags with multiple values show the last assigned value, and null values + are ignored. + """ + } + } + } + only_fields: { + description: "List of fields that are included in the encoded event." + required: false + type: array: items: type: string: {} + } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -I -o ` + + You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work). + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + unix_float: "Represent the timestamp as a Unix timestamp in floating point." + unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." + unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." + unix_us: "Represent the timestamp as a Unix timestamp in microseconds" + } + } + } + } + request: { + description: "Outbound HTTP request settings." + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/architecture/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + headers: { + description: "Additional HTTP headers to add to every HTTP request." + required: false + type: object: { + examples: [{ + Accept: "text/plain" + "X-Event-Level": "{{level}}" + "X-Event-Timestamp": "{{timestamp}}" + "X-My-Custom-Header": "A-Value" + }] + options: "*": { + description: "An HTTP request header and its value. Both header names and values support templating with event data." + required: true + type: string: {} + } + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with a peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set _and_ is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on, until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } +}