diff --git a/Cargo.lock b/Cargo.lock index eb53944590..f150e5ab4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,10 +229,13 @@ dependencies = [ "fe2o3-amqp-ext", "fe2o3-amqp-management", "fe2o3-amqp-types", + "native-tls", "serde", "serde_amqp", "serde_bytes", "tokio", + "tokio-native-tls", + "tokio-socks", "tracing", "tracing-subscriber", "typespec", @@ -377,6 +380,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "typespec_client_core", ] [[package]] @@ -2974,6 +2978,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-socks" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f" +dependencies = [ + "either", + "futures-util", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.17" diff --git a/Cargo.toml b/Cargo.toml index a729af313f..b9b193e9ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ getrandom = { version = "0.3" } gloo-timers = { version = "0.3" } hmac = { version = "0.12" } litemap = "0.7.4" +native-tls = "0.2" openssl = { version = "0.10.72" } opentelemetry = { version = "0.30", features = ["trace"] } opentelemetry_sdk = "0.30" @@ -143,6 +144,8 @@ tokio = { version = "1.0", default-features = false, features = [ "macros", "time", ] } +tokio-native-tls = "0.3" +tokio-socks = "0.5" tracing = "0.1.40" tracing-subscriber = "0.3" url = "2.2" diff --git a/sdk/core/azure_core/src/http/mod.rs b/sdk/core/azure_core/src/http/mod.rs index 035017810a..8c3abe1c0c 100644 --- a/sdk/core/azure_core/src/http/mod.rs +++ b/sdk/core/azure_core/src/http/mod.rs @@ -26,6 +26,9 @@ pub use typespec_client_core::http::{ Method, NoFormat, StatusCode, Url, }; +pub use typespec::http::{DEFAULT_ALLOWED_HEADER_NAMES, REDACTED_PATTERN}; +pub use typespec_client_core::http::{Sanitizer, DEFAULT_ALLOWED_QUERY_PARAMETERS}; + pub use crate::error::check_success; #[cfg(feature = "xml")] pub use typespec_client_core::http::XmlFormat; diff --git a/sdk/core/azure_core_amqp/.dict.txt b/sdk/core/azure_core_amqp/.dict.txt index 1dd934bdc1..a68a63c2c2 100644 --- a/sdk/core/azure_core_amqp/.dict.txt +++ b/sdk/core/azure_core_amqp/.dict.txt @@ -4,3 +4,6 @@ amqps sastoken smallulong smalluint +proxyuser +testuser +testpass diff --git a/sdk/core/azure_core_amqp/Cargo.toml b/sdk/core/azure_core_amqp/Cargo.toml index 55e5470541..e9b6d15667 100644 --- a/sdk/core/azure_core_amqp/Cargo.toml +++ b/sdk/core/azure_core_amqp/Cargo.toml @@ -29,6 +29,9 @@ serde.workspace = true serde_amqp = { workspace = true, optional = true } serde_bytes = { workspace = true, optional = true } tokio.workspace = true +tokio-native-tls = { workspace = true, optional = true } +tokio-socks = { workspace = true, optional = true } +native-tls = { workspace = true, optional = true } tracing.workspace = true typespec = { workspace = true, features = ["amqp"] } typespec_macros.workspace = true @@ -49,9 +52,10 @@ fe2o3_amqp = [ "serde_amqp", "serde_bytes", ] +socks5 = ["dep:tokio-socks", "dep:native-tls", "dep:tokio-native-tls"] [lints] workspace = true [package.metadata.docs.rs] -features = ["fe2o3_amqp"] +features = ["fe2o3_amqp", "socks5"] diff --git a/sdk/core/azure_core_amqp/README.md b/sdk/core/azure_core_amqp/README.md index 9133de3575..f73428e750 100644 --- a/sdk/core/azure_core_amqp/README.md +++ b/sdk/core/azure_core_amqp/README.md @@ -6,6 +6,49 @@ Azure AMQP crate for consumption of AMQP based packages in the Azure SDK for Rus This crate is part of a collection of crates: for more information please refer to [https://github.com/azure/azure-sdk-for-rust](https://github.com/azure/azure-sdk-for-rust). +## SOCKS5 Proxy Support + +This crate supports SOCKS5 proxy connections for corporate environments. + +**Note**: SOCKS5 support requires enabling the `socks5` feature: + +```toml +[dependencies] +azure_core_amqp = { version = "0.8", features = ["socks5"] } +``` + +SOCKS5 support is enabled by configuring the `custom_endpoint` option with a SOCKS5 URL: + +```rust,no_run +use azure_core_amqp::AmqpConnectionOptions; + +# fn main() -> Result<(), Box> { +let options = AmqpConnectionOptions { + custom_endpoint: Some("socks5h://proxy.contoso.com:8080".parse()?), + ..Default::default() +}; +# Ok(()) +# } +``` + +### Supported Protocols + +- **socks5://** - Standard SOCKS5 with local DNS resolution +- **socks5h://** - SOCKS5 with proxy-side DNS resolution (recommended for corporate environments) + +### Authentication + +Username/password authentication is supported via the proxy URL: +```text +socks5://username:password@proxy.example.com:1080 +``` + +All proxy credentials are automatically masked in log output for security. + +### Dependencies + +SOCKS5 support adds the `tokio-socks` dependency to the crate. + ## Testing the AMQP Client The AMQP package is tested using the standard `cargo test` command line: diff --git a/sdk/core/azure_core_amqp/src/fe2o3/connection.rs b/sdk/core/azure_core_amqp/src/fe2o3/connection.rs index 349cecb585..e94cd596c5 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/connection.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/connection.rs @@ -4,12 +4,14 @@ use super::error::{Fe2o3ConnectionError, Fe2o3ConnectionOpenError, Fe2o3TransportError}; use crate::connection::{AmqpConnectionApis, AmqpConnectionOptions}; use crate::error::AmqpErrorKind; +#[cfg(feature = "socks5")] +use crate::socks5::SocksConnection; use crate::value::{AmqpOrderedMap, AmqpSymbol, AmqpValue}; use azure_core::{http::Url, Result}; use fe2o3_amqp::connection::ConnectionHandle; use std::{borrow::BorrowMut, sync::OnceLock}; use tokio::sync::Mutex; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; #[derive(Debug, Default)] pub(crate) struct Fe2o3AmqpConnection { @@ -44,6 +46,117 @@ impl Drop for Fe2o3AmqpConnection { } } +macro_rules! configure_builder { + ($id:expr, $url:expr, $options:expr) => {{ + let mut builder = fe2o3_amqp::Connection::builder() + .sasl_profile(fe2o3_amqp::sasl_profile::SaslProfile::Anonymous) + .alt_tls_establishment(true) + .container_id($id) + .max_frame_size(65536); + + if let Some(frame_size) = $options.max_frame_size { + builder = builder.max_frame_size(frame_size); + } + + if let Some(channel_max) = $options.channel_max { + builder = builder.channel_max(channel_max); + } + + if let Some(idle_timeout) = $options.idle_timeout { + builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32); + } + + if let Some(outgoing_locales) = &$options.outgoing_locales { + builder = builder.set_outgoing_locales( + outgoing_locales + .iter() + .map(|s| fe2o3_amqp_types::primitives::Symbol::from(s.as_str())) + .collect(), + ); + } + + if let Some(incoming_locales) = &$options.incoming_locales { + builder = builder.set_incoming_locales( + incoming_locales + .iter() + .map(|s| fe2o3_amqp_types::primitives::Symbol::from(s.as_str())) + .collect(), + ); + } + + if let Some(offered_capabilities) = &$options.offered_capabilities { + builder = builder + .set_offered_capabilities(offered_capabilities.iter().map(Into::into).collect()); + } + + if let Some(desired_capabilities) = &$options.desired_capabilities { + builder = builder + .set_desired_capabilities(desired_capabilities.iter().map(Into::into).collect()); + } + + if let Some(properties) = &$options.properties { + builder = builder.properties( + properties + .iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), + ); + } + + if let Some(buffer_size) = $options.buffer_size { + builder = builder.buffer_size(buffer_size); + } + + // Set hostname if using custom endpoint + if $options.custom_endpoint.is_some() { + builder = builder.hostname($url.host_str()); + } + + builder + }}; +} + +#[cfg(feature = "socks5")] +async fn prepare_socks5_connection( + id: &str, + url: &Url, + endpoint: &Url, +) -> Result>> { + if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" { + debug!( + connection_id = %id, + proxy_scheme = %endpoint.scheme(), + target_host = %url.host_str().unwrap_or("unknown"), + "Opening AMQP connection through SOCKS5 proxy" + ); + + let stream = SocksConnection::connect(endpoint, url).await.map_err(|e| { + error!( + connection_id = %id, + proxy_url = %SocksConnection::mask_credentials(endpoint), + error = %e, + "Failed to establish SOCKS5 connection" + ); + e + })?; + + Ok(Some(stream)) + } else { + Ok(None) + } +} + +#[cfg(not(feature = "socks5"))] +async fn validate_no_socks5(endpoint: &Url) -> Result<()> { + if endpoint.scheme() == "socks5" || endpoint.scheme() == "socks5h" { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Amqp, + "SOCKS5 proxy support is not enabled. Enable the 'socks5' feature to use SOCKS5 proxies." + )); + } + Ok(()) +} + #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] impl AmqpConnectionApis for Fe2o3AmqpConnection { @@ -57,70 +170,59 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection { let options = options.unwrap_or_default(); let mut endpoint = url.clone(); - // All AMQP clients have a similar set of options. - let mut builder = fe2o3_amqp::Connection::builder() - .sasl_profile(fe2o3_amqp::sasl_profile::SaslProfile::Anonymous) - .alt_tls_establishment(true) - .container_id(id) - .max_frame_size(65536); - - if let Some(frame_size) = options.max_frame_size { - builder = builder.max_frame_size(frame_size); + if let Some(custom_endpoint) = options.custom_endpoint.clone() { + endpoint = custom_endpoint; } - if let Some(channel_max) = options.channel_max { - builder = builder.channel_max(channel_max); - } - if let Some(idle_timeout) = options.idle_timeout { - builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32); - } - if let Some(outgoing_locales) = options.outgoing_locales { - builder = builder.set_outgoing_locales( - outgoing_locales - .into_iter() - .map(fe2o3_amqp_types::primitives::Symbol::from) - .collect(), - ); - } - if let Some(incoming_locales) = options.incoming_locales { - builder = builder.set_incoming_locales( - incoming_locales - .into_iter() - .map(fe2o3_amqp_types::primitives::Symbol::from) - .collect(), - ); - } - if let Some(offered_capabilities) = options.offered_capabilities { - builder = builder.set_offered_capabilities( - offered_capabilities.into_iter().map(Into::into).collect(), - ); - } - if let Some(desired_capabilities) = options.desired_capabilities { - builder = builder.set_desired_capabilities( - desired_capabilities.into_iter().map(Into::into).collect(), - ); - } - if let Some(properties) = options.properties { - builder = builder.properties( - properties - .iter() - .map(|(k, v)| (k.into(), v.into())) - .collect(), - ); - } - if let Some(buffer_size) = options.buffer_size { - builder = builder.buffer_size(buffer_size); - } + let connection_type = if endpoint.scheme().starts_with("socks5") { + "socks5" + } else { + "direct" + }; - if let Some(custom_endpoint) = options.custom_endpoint { - endpoint = custom_endpoint; - builder = builder.hostname(url.host_str()); - } + let connection = { + #[cfg(feature = "socks5")] + { + let stream = prepare_socks5_connection(&id, &url, &endpoint).await?; + let builder = configure_builder!(&id, &url, &options); + + if let Some(stream) = stream { + debug!(connection_id = %id, "Opening AMQP connection with SOCKS5 stream"); + builder.open_with_stream(stream).await.map_err(|e| { + error!(connection_id = %id, error = %e, "Failed to open AMQP connection over SOCKS5 stream"); + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) + })? + } else { + debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection"); + let endpoint_str = endpoint.to_string(); + builder.open(endpoint).await.map_err(|e| { + error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection"); + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) + })? + } + } + #[cfg(not(feature = "socks5"))] + { + validate_no_socks5(&endpoint).await?; + let builder = configure_builder!(&id, &url, &options); + + debug!(connection_id = %id, endpoint = %endpoint, "Opening direct AMQP connection"); + let endpoint_str = endpoint.to_string(); + builder.open(endpoint).await.map_err(|e| { + error!(connection_id = %id, endpoint = %endpoint_str, error = %e, "Failed to open direct AMQP connection"); + azure_core::Error::from(Fe2o3ConnectionOpenError(e)) + })? + } + }; + + debug!( + connection_id = %id, + connection_type = %connection_type, + "AMQP connection opened successfully" + ); self.connection - .set(Mutex::new(builder.open(endpoint).await.map_err(|e| { - azure_core::Error::from(Fe2o3ConnectionOpenError(e)) - })?)) + .set(Mutex::new(connection)) .map_err(|_| Self::connection_already_set())?; Ok(()) } diff --git a/sdk/core/azure_core_amqp/src/lib.rs b/sdk/core/azure_core_amqp/src/lib.rs index 2a9d7664bc..ac419092a8 100644 --- a/sdk/core/azure_core_amqp/src/lib.rs +++ b/sdk/core/azure_core_amqp/src/lib.rs @@ -21,6 +21,8 @@ mod receiver; mod sender; mod session; mod simple_value; +#[cfg(feature = "socks5")] +mod socks5; mod value; pub use cbs::{AmqpClaimsBasedSecurity, AmqpClaimsBasedSecurityApis}; diff --git a/sdk/core/azure_core_amqp/src/socks5.rs b/sdk/core/azure_core_amqp/src/socks5.rs new file mode 100644 index 0000000000..373148b43e --- /dev/null +++ b/sdk/core/azure_core_amqp/src/socks5.rs @@ -0,0 +1,676 @@ +// Copyright (c) Microsoft Corporation. All Rights reserved +// Licensed under the MIT license. + +//! SOCKS5 proxy support module. + +use azure_core::{ + error::Result, + http::{Sanitizer, Url}, +}; +use native_tls::TlsConnector as NativeTlsConnector; +use std::collections::HashSet; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_native_tls::TlsConnector; +use tokio_socks::{tcp::Socks5Stream, TargetAddr}; +use tracing::{debug, error, trace}; + +/// A trait that combines AsyncRead, AsyncWrite, Unpin, Send and Debug for SOCKS5 streams +pub trait SocksStream: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug + 'static {} + +impl SocksStream for T where T: AsyncRead + AsyncWrite + Unpin + Send + std::fmt::Debug + 'static {} + +/// A wrapper that implements the SocksStream trait +#[derive(Debug)] +pub struct StreamWrapper(pub T); + +impl AsyncRead for StreamWrapper +where + T: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for StreamWrapper +where + T: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + +/// SOCKS5 connection helper for azure_core_amqp +/// +/// Supports both SOCKS5 and SOCKS5h protocols for connecting to AMQP services through proxy servers: +/// - `socks5://` - Local DNS resolution (client resolves target hostname) +/// - `socks5h://` - Proxy-side DNS resolution (proxy resolves target hostname, recommended for privacy) +/// +/// # Protocol Differences +/// +/// **SOCKS5 (`socks5://`)**: +/// - Client performs DNS resolution of target hostname +/// - Target IP address sent through proxy tunnel +/// - Faster connection establishment +/// - DNS queries visible to local network monitoring +/// +/// **SOCKS5h (`socks5h://`)**: +/// - Proxy performs DNS resolution of target hostname +/// - Target hostname sent through proxy tunnel +/// - Enhanced privacy (DNS queries hidden from local network) +/// - Recommended for corporate environments +/// +/// # Authentication +/// +/// Supports username/password authentication via proxy URL: +/// ```text +/// socks5://username:password@proxy.example.com:1080 +/// socks5h://user:pass@corporate-proxy.internal:8080 +/// ``` +/// +/// # Examples +/// +/// ## Basic Connection +/// ```ignore +/// use azure_core::http::Url; +/// use azure_core_amqp::socks5::SocksConnection; +/// +/// async fn connect_through_proxy() -> azure_core::Result<()> { +/// let proxy_url = Url::parse("socks5h://proxy.example.com:1080")?; +/// let target_url = Url::parse("amqps://eventhub.servicebus.windows.net")?; +/// +/// let stream = SocksConnection::connect(&proxy_url, &target_url).await?; +/// // Use stream for AMQP connection +/// Ok(()) +/// } +/// ``` +/// +/// ## Authenticated Connection +/// ```ignore +/// use azure_core::http::Url; +/// use azure_core_amqp::socks5::SocksConnection; +/// +/// async fn connect_with_auth() -> azure_core::Result<()> { +/// let proxy_url = Url::parse("socks5://username:password@proxy.corp.com:8080")?; +/// let target_url = Url::parse("amqps://my-eventhub.servicebus.windows.net")?; +/// +/// let stream = SocksConnection::connect(&proxy_url, &target_url).await?; +/// // Credentials are automatically masked in logs for security +/// Ok(()) +/// } +/// ``` +/// +/// # Security Considerations +/// +/// - Proxy credentials are automatically masked in all log output +/// - Use SOCKS5h protocol for enhanced DNS privacy in corporate environments +/// - Ensure proxy URL is obtained from secure configuration sources +/// - Consider using environment variables for sensitive proxy credentials +/// +/// # Error Handling +/// +/// Connection failures can occur due to: +/// - Invalid proxy URL format (use [`validate_proxy_url`](Self::validate_proxy_url)) +/// - Network connectivity issues to proxy server +/// - Authentication failures with proxy server +/// - Target service unreachable through proxy +/// +/// All errors include contextual information for troubleshooting. +pub(crate) struct SocksConnection; + +impl SocksConnection { + /// Validate SOCKS5 proxy URL format and requirements + /// + /// Ensures the provided URL is properly formatted for SOCKS5 proxy connections. + /// This function should be called before attempting to establish a connection + /// to catch configuration errors early. + /// + /// # Supported Schemes + /// + /// - `socks5://` - Standard SOCKS5 protocol with local DNS resolution + /// - `socks5h://` - SOCKS5 protocol with proxy-side DNS resolution + /// + /// # Required Components + /// + /// - **Scheme**: Must be `socks5` or `socks5h` + /// - **Host**: Proxy server hostname or IP address + /// - **Port**: Optional (defaults to 1080 if not specified) + /// - **Authentication**: Optional username:password + /// + /// # Examples + /// + /// ```ignore + /// use azure_core::http::Url; + /// use azure_core_amqp::socks5::SocksConnection; + /// + /// // Valid URLs + /// assert!(SocksConnection::validate_proxy_url(&Url::parse("socks5://proxy.example.com").unwrap()).is_ok()); + /// assert!(SocksConnection::validate_proxy_url(&Url::parse("socks5h://proxy:8080").unwrap()).is_ok()); + /// assert!(SocksConnection::validate_proxy_url(&Url::parse("socks5://user:pass@proxy.corp.com:1080").unwrap()).is_ok()); + /// + /// // Invalid URLs + /// assert!(SocksConnection::validate_proxy_url(&Url::parse("http://proxy.example.com").unwrap()).is_err()); + /// assert!(SocksConnection::validate_proxy_url(&Url::parse("socks5://").unwrap()).is_err()); // Missing host + /// ``` + /// + /// # Errors + /// + /// Returns an error if: + /// - URL scheme is not `socks5` or `socks5h` + /// - Host component is missing or empty + /// - URL is malformed + /// + /// # Security Note + /// + /// This function does not validate the actual connectivity to the proxy server + /// or the correctness of authentication credentials. It only validates the URL format. + pub fn validate_proxy_url(url: &Url) -> Result<()> { + trace!(proxy_url = %url, "Validating SOCKS5 proxy URL format"); + + if !["socks5", "socks5h"].contains(&url.scheme()) { + error!( + proxy_scheme = %url.scheme(), + "Invalid SOCKS5 proxy scheme" + ); + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + format!("Invalid SOCKS5 scheme: {}", url.scheme()), + )); + } + if url.host_str().is_none() { + error!("Missing host in SOCKS5 proxy URL"); + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "Missing host in SOCKS5 URL", + )); + } + Ok(()) + } + + /// Create SOCKS5 connection to target through proxy + /// + /// Establishes a connection to the target service through a SOCKS5 proxy server. + /// This function handles both SOCKS5 and SOCKS5h protocols, authentication, + /// and TLS wrapping for AMQPS connections. + /// + /// # Protocol Flow + /// + /// 1. **Validation**: Validates proxy URL format + /// 2. **Proxy Connection**: Connects to SOCKS5 proxy server + /// 3. **Authentication**: Performs username/password auth if credentials provided + /// 4. **Target Request**: Requests connection to target through proxy tunnel + /// 5. **TLS Wrapping**: Applies TLS if target uses AMQPS protocol + /// + /// # Parameters + /// + /// - `proxy_url`: SOCKS5 proxy server URL with optional authentication + /// - `target_url`: Target AMQP service URL (amqp:// or amqps://) + /// + /// # Examples + /// + /// ## Basic Proxy Connection + /// ```ignore,no_run + /// use azure_core::http::Url; + /// use azure_core_amqp::socks5::SocksConnection; + /// + /// async fn basic_connection() -> azure_core::Result<()> { + /// let proxy_url = Url::parse("socks5h://corporate-proxy.internal:8080")?; + /// let target_url = Url::parse("amqps://my-eventhub.servicebus.windows.net:5671")?; + /// + /// let stream = SocksConnection::connect(&proxy_url, &target_url).await?; + /// // Stream is ready for fe2o3-amqp Connection::open_with_stream() + /// Ok(()) + /// } + /// ``` + /// + /// ## Authenticated Connection + /// ```ignore,no_run + /// use azure_core::http::Url; + /// use azure_core_amqp::socks5::SocksConnection; + /// + /// async fn authenticated_connection() -> azure_core::Result<()> { + /// let proxy_url = Url::parse("socks5://proxyuser:proxypass@proxy.corp.com:1080")?; + /// let target_url = Url::parse("amqps://eventhub.servicebus.windows.net")?; + /// + /// let stream = SocksConnection::connect(&proxy_url, &target_url).await?; + /// // Authentication handled automatically, credentials masked in logs + /// Ok(()) + /// } + /// ``` + /// + /// # TLS Handling + /// + /// - **AMQPS targets** (port 5671 or amqps:// scheme): Automatically wrapped with TLS + /// - **AMQP targets** (port 5672 or amqp:// scheme): Plain TCP connection + /// - TLS handshake performed after SOCKS5 tunnel establishment + /// + /// # Error Scenarios + /// + /// This function may fail due to: + /// - **Invalid proxy URL**: Use [`validate_proxy_url`](Self::validate_proxy_url) first + /// - **Network errors**: Proxy server unreachable or connection refused + /// - **Authentication failures**: Invalid username/password for proxy + /// - **Proxy errors**: Target service unreachable through proxy + /// - **TLS errors**: Certificate validation failures for AMQPS connections + /// + /// # Security Features + /// + /// - **Credential masking**: Proxy credentials automatically masked in all log output + /// - **TLS validation**: Full certificate chain validation for AMQPS connections + /// - **DNS privacy**: SOCKS5h protocol hides DNS queries from local network + /// + /// # Performance Characteristics + /// + /// - **Connection pooling**: Caller responsible for connection reuse + /// - **Memory usage**: Minimal overhead, stream-based design + /// - **Latency**: Additional hop through proxy server + /// + /// # Returns + /// + /// Returns a boxed stream implementing [`SocksStream`] trait, ready for use + /// with fe2o3-amqp's `Connection::open_with_stream()` method. + pub async fn connect(proxy_url: &Url, target_url: &Url) -> Result> { + debug!( + proxy_url = %Self::mask_credentials(proxy_url), + target_host = %target_url.host_str().unwrap_or("unknown"), + target_port = target_url.port().unwrap_or(5671), + "Establishing SOCKS5 connection to EventHub" + ); + + // Validate proxy URL format + Self::validate_proxy_url(proxy_url)?; + + // DNS resolution happens at proxy server for socks5h:// scheme + let proxy_host = proxy_url.host_str().ok_or_else(|| { + error!("Missing proxy host in SOCKS5 URL"); + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "Missing proxy host in SOCKS5 URL", + ) + })?; + let proxy_port = proxy_url.port().unwrap_or(1080); + + debug!( + proxy_host = %proxy_host, + proxy_port = proxy_port, + dns_resolution = %if proxy_url.scheme() == "socks5h" { "proxy-side" } else { "local" }, + "Connecting to SOCKS5 proxy" + ); + + // Always use domain name - let SOCKS5 proxy handle resolution + let target_addr = TargetAddr::Domain( + target_url.host_str().unwrap_or("").into(), + target_url.port().unwrap_or(5671), + ); + + // Handle authentication if provided in URL + let stream = if !proxy_url.username().is_empty() { + let username = proxy_url.username(); + let password = proxy_url.password().unwrap_or(""); + if username.is_empty() { + error!("Empty username in SOCKS5 proxy URL"); + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "Empty username in SOCKS5 URL", + )); + } + + debug!( + username = %username, + "Authenticating SOCKS5 connection with credentials" + ); + + Socks5Stream::connect_with_password( + (proxy_host, proxy_port), + target_addr, + username, + password, + ) + .await + } else { + debug!("Connecting to SOCKS5 proxy without authentication"); + Socks5Stream::connect((proxy_host, proxy_port), target_addr).await + } + .map_err(|e| { + error!( + proxy_url = %Self::mask_credentials(proxy_url), + target_url = %target_url.to_string(), + error = %e, + "SOCKS5 connection establishment failed" + ); + + azure_core::Error::with_error( + azure_core::error::ErrorKind::Other, + e, + format!( + "SOCKS5 connection failed: proxy={}, target={}", + Self::mask_credentials(proxy_url), + target_url + ), + ) + })?; + + debug!( + proxy_url = %Self::mask_credentials(proxy_url), + target_url = %target_url.to_string(), + "SOCKS5 connection established successfully" + ); + + // Check if target URL requires TLS (amqps://) + let requires_tls = + target_url.scheme() == "amqps" || target_url.port().unwrap_or(5671) == 5671; + + if requires_tls { + debug!( + target_host = %target_url.host_str().unwrap_or("unknown"), + "Wrapping SOCKS5 stream with TLS for AMQPS connection" + ); + + // Create TLS connector with default settings + let native_connector = NativeTlsConnector::new().map_err(|e| { + error!( + error = %e, + "Failed to create TLS connector" + ); + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + format!("Failed to create TLS connector: {}", e), + ) + })?; + + let connector = TlsConnector::from(native_connector); + let target_host = target_url.host_str().ok_or_else(|| { + error!("Missing target host for TLS connection"); + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "Missing target host for TLS connection", + ) + })?; + + // Establish TLS connection over SOCKS5 stream + let tls_stream = connector + .connect(target_host, stream.into_inner()) + .await + .map_err(|e| { + error!( + target_host = %target_host, + error = %e, + "TLS handshake failed over SOCKS5 connection" + ); + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + format!("TLS handshake failed: {}", e), + ) + })?; + + debug!( + target_host = %target_host, + "TLS connection established over SOCKS5 tunnel" + ); + + trace!("SOCKS5+TLS stream ready for AMQP protocol"); + Ok(Box::new(StreamWrapper(tls_stream))) + } else { + // Plain TCP connection for non-TLS protocols + let inner = stream.into_inner(); + trace!("SOCKS5 stream extracted and ready for plain AMQP protocol"); + Ok(Box::new(StreamWrapper(inner))) + } + } + + /// Mask credentials in proxy URL for logging + /// + /// Creates a safe representation of the proxy URL with credentials masked + /// for inclusion in log messages and error reports. This prevents sensitive + /// authentication information from being exposed in logs or debug output. + /// + /// # Security Rationale + /// + /// Proxy URLs often contain sensitive authentication credentials: + /// ```text + /// socks5://username:password@proxy.corp.com:1080 + /// ``` + /// + /// Direct logging of such URLs would expose credentials in: + /// - Application logs + /// - Error messages + /// - Debug output + /// - Monitoring systems + /// + /// # Masking Strategy + /// + /// - **Username**: Replaced with `***` + /// - **Password**: Replaced with `***` + /// - **Host/Port**: Preserved for debugging + /// - **Scheme**: Preserved for protocol identification + /// + /// # Examples + /// + /// ```ignore + /// use azure_core::http::Url; + /// use azure_core_amqp::socks5::SocksConnection; + /// + /// let url_with_auth = Url::parse("socks5://user:pass@proxy.example.com:1080").unwrap(); + /// let masked = SocksConnection::mask_credentials(&url_with_auth); + /// assert_eq!(masked, "socks5://***:***@proxy.example.com:1080"); + /// + /// let url_no_auth = Url::parse("socks5://proxy.example.com:1080").unwrap(); + /// let masked = SocksConnection::mask_credentials(&url_no_auth); + /// assert_eq!(masked, "socks5://proxy.example.com:1080"); + /// ``` + /// + /// # Error Handling + /// + /// If the URL cannot be parsed or modified, returns `"invalid_url"` as a + /// safe fallback to prevent any credential exposure. + /// + /// # Usage + /// + /// This function is automatically used in all logging statements within + /// the SOCKS5 implementation. Manual usage is typically not required + /// unless implementing custom logging or error handling. + pub(crate) fn mask_credentials(url: &Url) -> String { + let mut masked = url.clone(); + if masked.username() != "" { + let _ = masked.set_username("***"); + } + if masked.password().is_some() { + let _ = masked.set_password(Some("***")); + } + masked.sanitize(&HashSet::new()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use azure_core::http::Url; + + #[test] + fn test_validate_proxy_url_valid_schemes() { + // Test valid SOCKS5 schemes + let socks5_url = Url::parse("socks5://proxy.example.com:1080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&socks5_url).is_ok()); + + let socks5h_url = Url::parse("socks5h://proxy.example.com:1080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&socks5h_url).is_ok()); + + // Test with default port + let socks5_no_port = Url::parse("socks5://proxy.example.com").unwrap(); + assert!(SocksConnection::validate_proxy_url(&socks5_no_port).is_ok()); + + // Test with authentication + let socks5_auth = Url::parse("socks5://user:pass@proxy.example.com:8080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&socks5_auth).is_ok()); + } + + #[test] + fn test_validate_proxy_url_invalid_schemes() { + // Test invalid schemes + let http_url = Url::parse("http://proxy.example.com:8080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&http_url).is_err()); + + let https_url = Url::parse("https://proxy.example.com:443").unwrap(); + assert!(SocksConnection::validate_proxy_url(&https_url).is_err()); + + let ftp_url = Url::parse("ftp://proxy.example.com:21").unwrap(); + assert!(SocksConnection::validate_proxy_url(&ftp_url).is_err()); + + let amqp_url = Url::parse("amqp://eventhub.servicebus.windows.net:5672").unwrap(); + assert!(SocksConnection::validate_proxy_url(&amqp_url).is_err()); + } + + #[test] + fn test_validate_proxy_url_missing_host() { + // Test URLs without host component + let no_host_url = Url::parse("socks5:///path").unwrap(); + assert!(SocksConnection::validate_proxy_url(&no_host_url).is_err()); + + // Test URL with no host specified - this actually parses but has None host + let empty_host_url = Url::parse("socks5://").unwrap(); + assert!(SocksConnection::validate_proxy_url(&empty_host_url).is_err()); + + // Test URLs that fail to parse due to empty host + assert!(Url::parse("socks5://:1080").is_err()); // EmptyHost error as expected + + // These tests verify our validation catches missing hosts in URLs that do parse + let relative_url = Url::parse("socks5:///").unwrap(); + assert!(SocksConnection::validate_proxy_url(&relative_url).is_err()); + } + + #[test] + fn test_validate_proxy_url_edge_cases() { + // Test with IP address + let ip_url = Url::parse("socks5://192.168.1.100:1080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&ip_url).is_ok()); + + // Test with IPv6 + let ipv6_url = Url::parse("socks5h://[::1]:1080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&ipv6_url).is_ok()); + + // Test with complex hostname + let complex_host = Url::parse("socks5://proxy-server.corp.example.com:8080").unwrap(); + assert!(SocksConnection::validate_proxy_url(&complex_host).is_ok()); + } + + #[test] + fn test_mask_credentials_no_auth() { + // URLs without authentication should remain unchanged + let url_no_auth = Url::parse("socks5://proxy.example.com:1080").unwrap(); + let masked = SocksConnection::mask_credentials(&url_no_auth); + assert_eq!(masked, "socks5://proxy.example.com:1080"); + + let url_no_port = Url::parse("socks5h://proxy.corp.com").unwrap(); + let masked_no_port = SocksConnection::mask_credentials(&url_no_port); + assert_eq!(masked_no_port, "socks5h://proxy.corp.com"); + } + + #[test] + fn test_mask_credentials_with_auth() { + // Username and password should be masked + let url_with_auth = + Url::parse("socks5://username:password@proxy.example.com:1080").unwrap(); + let masked = SocksConnection::mask_credentials(&url_with_auth); + assert_eq!(masked, "socks5://***:***@proxy.example.com:1080"); + + // Test with SOCKS5h + let url_socks5h = Url::parse("socks5h://user:pass@proxy.corp.com:8080").unwrap(); + let masked_socks5h = SocksConnection::mask_credentials(&url_socks5h); + assert_eq!(masked_socks5h, "socks5h://***:***@proxy.corp.com:8080"); + } + + #[test] + fn test_mask_credentials_username_only() { + // Test URL with username but no password + let url_user_only = Url::parse("socks5://username@proxy.example.com:1080").unwrap(); + let masked = SocksConnection::mask_credentials(&url_user_only); + assert_eq!(masked, "socks5://***@proxy.example.com:1080"); + } + + #[test] + fn test_mask_credentials_special_characters() { + // Test credentials with special characters (URL encoded) + let url_special = + Url::parse("socks5://user%40domain:p%40ssw0rd@proxy.example.com:1080").unwrap(); + let masked = SocksConnection::mask_credentials(&url_special); + assert_eq!(masked, "socks5://***:***@proxy.example.com:1080"); + + // Test with complex credentials + let url_complex = + Url::parse("socks5://admin:secretP%40ss123@proxy-server.corp.com:8080").unwrap(); + let masked_complex = SocksConnection::mask_credentials(&url_complex); + assert_eq!( + masked_complex, + "socks5://***:***@proxy-server.corp.com:8080" + ); + } + + #[test] + fn test_mask_credentials_preserves_structure() { + // Verify that masking preserves host, port, and scheme + let original = + Url::parse("socks5h://testuser:testpass@corporate-proxy.internal:12345").unwrap(); + let masked = SocksConnection::mask_credentials(&original); + + assert!(masked.starts_with("socks5h://")); + assert!(masked.contains("corporate-proxy.internal:12345")); + assert!(masked.contains("***:***")); + assert!(!masked.contains("testuser")); + assert!(!masked.contains("testpass")); + } + + #[test] + fn test_mask_credentials_empty_credentials() { + // Test edge case with empty username/password + let url_empty_user = Url::parse("socks5://:password@proxy.example.com:1080").unwrap(); + let masked_empty_user = SocksConnection::mask_credentials(&url_empty_user); + // Empty username should still be masked for consistency + assert!(masked_empty_user.contains("***")); + + let url_empty_pass = Url::parse("socks5://username:@proxy.example.com:1080").unwrap(); + let masked_empty_pass = SocksConnection::mask_credentials(&url_empty_pass); + assert!(masked_empty_pass.contains("***")); + } + + #[test] + fn test_mask_credentials_error_handling() { + // Test behavior with various URL scenarios + let urls_to_test = vec![ + "socks5://user:pass@proxy.example.com:1080", + "socks5h://admin:secret@192.168.1.100:8080", + "socks5://test@proxy.corp.com", + "socks5h://proxy.example.com:1080", + ]; + + for url_str in urls_to_test { + let url = Url::parse(url_str).unwrap(); + let masked = SocksConnection::mask_credentials(&url); + + // Verify credentials are not exposed + assert!(!masked.contains("pass")); + assert!(!masked.contains("secret")); + assert!( + !masked.contains("admin") || url_str.contains("admin") == masked.contains("admin") + ); + } + } +} diff --git a/sdk/eventhubs/.dict.txt b/sdk/eventhubs/.dict.txt index 0753be692a..78f9c3358c 100644 --- a/sdk/eventhubs/.dict.txt +++ b/sdk/eventhubs/.dict.txt @@ -5,3 +5,4 @@ mgmt rustc myapp yourcontainername +sockd diff --git a/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml b/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml index 6545a19516..866d6fdf00 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml +++ b/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml @@ -26,6 +26,7 @@ futures.workspace = true rand.workspace = true rand_chacha.workspace = true tracing.workspace = true +typespec_client_core = { workspace = true } [build-dependencies] rustc_version.workspace = true @@ -44,6 +45,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } [features] in_memory_checkpoint_store = [] +socks5 = ["azure_core_amqp/socks5"] default = ["azure_core_amqp/default"] [[bench]] diff --git a/sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_socks5_proxy.rs b/sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_socks5_proxy.rs new file mode 100644 index 0000000000..15ce5b2fde --- /dev/null +++ b/sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_socks5_proxy.rs @@ -0,0 +1,148 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//! This sample demonstrates how to connect to an Event Hub through a SOCKS5 proxy using the `ProducerClient`. +//! +//! # SOCKS5 Proxy Setup +//! +//! To run this example, you need: +//! +//! 1. **A running SOCKS5 proxy server**: +//! ```bash +//! # Example using SSH tunnel +//! ssh -D 8080 user@proxy-server.example.com +//! +//! # Or using a dedicated SOCKS5 proxy like Dante +//! sockd -D -p 8080 +//! ``` +//! +//! 2. **Environment variables**: +//! ```bash +//! export EVENTHUBS_HOST="your-eventhub.servicebus.windows.net" +//! export EVENTHUB_NAME="your-eventhub-name" +//! export SOCKS5_PROXY_URL="socks5h://my-proxy-domain:12345" # Optional, defaults to socks5h://my-proxy-domain:12345 +//! export SOCKS5_PROXY_URL="socks5://user:pass@my-proxy-domain:12345" # With authentication +//! ``` +//! +//! # Protocol Support +//! +//! - **socks5://** - Standard SOCKS5 with local DNS resolution +//! - **socks5h://** - SOCKS5 with proxy-side DNS resolution (recommended for corporate environments) +//! +//! # Usage +//! +//! **Note**: This example requires the `socks5` feature to be enabled. +//! +//! ```bash +//! cargo run --features socks5 --example eventhubs_socks5_proxy +//! ``` + +use azure_identity::DeveloperToolsCredential; +use azure_messaging_eventhubs::ProducerClient; +use std::env; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing subscriber for logging + tracing_subscriber::fmt::init(); + + // Get EventHub connection details + let eventhub_namespace = + env::var("EVENTHUBS_HOST").expect("EVENTHUBS_HOST environment variable must be set"); + let eventhub_name = + env::var("EVENTHUB_NAME").expect("EVENTHUB_NAME environment variable must be set"); + + // Get SOCKS5 proxy URL (with fallback default) + let proxy_url = env::var("SOCKS5_PROXY_URL").unwrap_or_else(|_| { + warn!("SOCKS5_PROXY_URL not set, using default: socks5h://my-proxy-domain:12345"); + "socks5h://my-proxy-domain:12345".to_string() + }); + + info!( + eventhub_host = %eventhub_namespace, + eventhub_name = %eventhub_name, + proxy_url = %mask_proxy_credentials(&proxy_url), + "Connecting to EventHub through SOCKS5 proxy" + ); + + // Create credential + let credential = DeveloperToolsCredential::new(None)?; + + // Create producer client with SOCKS5 proxy + let client = ProducerClient::builder() + .with_application_id("socks5_proxy_example".to_string()) + .with_custom_endpoint(proxy_url) + .open( + eventhub_namespace.as_str(), + eventhub_name.as_str(), + credential, + ) + .await?; + + info!("Successfully created producer client through SOCKS5 proxy"); + + // Send a test message through the proxy + let test_message = format!( + "Hello from SOCKS5 proxy at {:?}", + std::time::SystemTime::now() + ); + client.send_event(test_message.as_str(), None).await?; + + info!( + message = %test_message, + "Successfully sent message through SOCKS5 proxy" + ); + + // Clean up + client.close().await?; + info!("Closed producer client connection"); + + Ok(()) +} + +/// Mask credentials in proxy URL for secure logging +/// +/// This function replaces username and password in the proxy URL with asterisks +/// to prevent credential exposure in log files. +fn mask_proxy_credentials(url: &str) -> String { + use azure_core::http::{Sanitizer, Url}; + use std::collections::HashSet; + + if let Ok(parsed_url) = Url::parse(url) { + let mut masked = parsed_url.clone(); + if !parsed_url.username().is_empty() { + let _ = masked.set_username("***"); + } + if parsed_url.password().is_some() { + let _ = masked.set_password(Some("***")); + } + masked.sanitize(&HashSet::new()) + } else { + // If URL parsing fails, return a safe placeholder + "invalid_proxy_url".to_string() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mask_proxy_credentials() { + // Test masking credentials + let url_with_auth = "socks5://user:password@proxy.example.com:1080"; + let masked = mask_proxy_credentials(url_with_auth); + assert_eq!(masked, "socks5://***:***@proxy.example.com:1080"); + + // Test URL without credentials + let url_no_auth = "socks5h://proxy.example.com:1080"; + let unmasked = mask_proxy_credentials(url_no_auth); + assert_eq!(unmasked, "socks5h://proxy.example.com:1080"); + + // Test invalid URL + let invalid_url = "not-a-url"; + let result = mask_proxy_credentials(invalid_url); + assert_eq!(result, "invalid_proxy_url"); + } +}