Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions temporalio/bridge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dataclasses import dataclass
from datetime import timedelta
from typing import Mapping, Optional, Tuple, Type, TypeVar
from typing import Mapping, Optional, Tuple, Type, TypeVar, Union

import google.protobuf.message

Expand Down Expand Up @@ -59,7 +59,7 @@ class ClientConfig:
"""Python representation of the Rust struct for configuring the client."""

target_url: str
metadata: Mapping[str, str]
metadata: Mapping[str, Union[str, bytes]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to use str | bytes luckily ran into a CI issue about Python 3.9 incompatibility (which I hadn't realized). These all use Union[str, bytes] as a result.

api_key: Optional[str]
identity: str
tls_config: Optional[ClientTlsConfig]
Expand All @@ -77,7 +77,7 @@ class RpcCall:
rpc: str
req: bytes
retry: bool
metadata: Mapping[str, str]
metadata: Mapping[str, Union[str, bytes]]
timeout_millis: Optional[int]


Expand Down Expand Up @@ -108,7 +108,7 @@ def __init__(
self._runtime = runtime
self._ref = ref

def update_metadata(self, metadata: Mapping[str, str]) -> None:
def update_metadata(self, metadata: Mapping[str, Union[str, bytes]]) -> None:
"""Update underlying metadata on Core client."""
self._ref.update_metadata(metadata)

Expand All @@ -124,7 +124,7 @@ async def call(
req: google.protobuf.message.Message,
resp_type: Type[ProtoMessage],
retry: bool,
metadata: Mapping[str, str],
metadata: Mapping[str, Union[str, bytes]],
timeout: Optional[timedelta],
) -> ProtoMessage:
"""Make RPC call using SDK Core."""
Expand Down
96 changes: 84 additions & 12 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use temporal_client::{
ConfiguredClient, HealthService, HttpConnectProxyOptions, RetryClient, RetryConfig,
TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService,
};
use tonic::metadata::MetadataKey;
use tonic::metadata::{
AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue,
};
use url::Url;

use crate::runtime;
Expand All @@ -28,7 +30,7 @@ pub struct ClientConfig {
target_url: String,
client_name: String,
client_version: String,
metadata: HashMap<String, String>,
metadata: HashMap<String, RpcMetadataValue>,
api_key: Option<String>,
identity: String,
tls_config: Option<ClientTlsConfig>,
Expand Down Expand Up @@ -72,10 +74,18 @@ struct RpcCall {
rpc: String,
req: Vec<u8>,
retry: bool,
metadata: HashMap<String, String>,
metadata: HashMap<String, RpcMetadataValue>,
timeout_millis: Option<u64>,
}

#[derive(FromPyObject)]
enum RpcMetadataValue {
#[pyo3(transparent, annotation = "str")]
Str(String),
#[pyo3(transparent, annotation = "bytes")]
Bytes(Vec<u8>),
}
Comment on lines +81 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the tests in tests/api/test_grpc_stub.py for what the error message looks like, when an invalid type is specified. I think pyo3 does a good job at producing a reasonably-good error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 And not too worried about error quality for this rare situation


pub fn connect_client<'a>(
py: Python<'a>,
runtime_ref: &runtime::RuntimeRef,
Expand Down Expand Up @@ -116,8 +126,19 @@ macro_rules! rpc_call_on_trait {

#[pymethods]
impl ClientRef {
fn update_metadata(&self, headers: HashMap<String, String>) {
self.retry_client.get_client().set_headers(headers);
fn update_metadata(&self, headers: HashMap<String, RpcMetadataValue>) -> PyResult<()> {
let (ascii_headers, binary_headers) = partition_headers(headers);

self.retry_client
.get_client()
.set_headers(ascii_headers)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
self.retry_client
.get_client()
.set_binary_headers(binary_headers)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Comment on lines +132 to +139
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are both fallible and non-atomic 😕. Right now there's a bit of a risk that the client is left in an inconsistent state (in a way that other code could observe) if one of them fails.

I suspect you'll run into this in the other SDKs, too.

I made a test to reproduce/document the issue:

# Setting invalid RPC metadata in a mixed client will partially fail:
client.rpc_metadata = {
"x-my-binary-bin": b"\x00",
"x-my-ascii": "foo",
}
assert client.rpc_metadata == {
"x-my-binary-bin": b"\x00",
"x-my-ascii": "foo",
}
with pytest.raises(
ValueError,
match="Invalid binary header key 'x-invalid-ascii-with-bin-value': invalid gRPC metadata key name",
):
client.rpc_metadata = {
"x-invalid-ascii-with-bin-value": b"not-ascii",
"x-my-ascii": "bar",
}
assert client.rpc_metadata == {
"x-my-binary-bin": b"\x00",
"x-my-ascii": "foo",
}
await client.workflow_service.get_system_info(GetSystemInfoRequest())
workflow_server.assert_last_metadata(
{
"authorization": "Bearer my-api-key",
# This is inconsistent with what `client.rpc_metadata` returns
# (`x-my-ascii` was updated):
"x-my-binary-bin": b"\x00",
"x-my-ascii": "bar",
}
)

Let me know if we should fix this. I think another PR to SDK core that either:

  1. Adds an atomic ConfiguredClient::set_ascii_and_binary_headers(&self, ascii_headers: HashMap<String, String>, binary_headers: HashMap<String, Vec<u8>>) -> Result<(), InvalidHeaderError>
  2. Adds ConfiguredClient::clear_headers(&self) -> () / ConfiguredClient::clear_binary_headers(&self) -> () that can be used to infallibly clear headers, as a way to at least move the client into a consistent state if there any errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not that concerned, though arguably we could have changed set_headers to accept both types of headers instead of just one. We can just document this above the rpc metadata setter I think. I appreciate that ascii is done first since I think y'all may be the only one doing binary :-)

Copy link
Contributor Author

@jazev-stripe jazev-stripe Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have changed set_headers to accept both types of headers instead of just one.

FWIW the reason I didn't do something like that originally is that it would have broken semver on the Core SDK client crate (although maybe I could have done some impl Into<...> magic to make existing callers still work?)

We can just document this above the rpc metadata setter I think.

Ack, I can make that change.

I appreciate that ascii is done first since I think y'all may be the only one doing binary :-)

Heh, fair. Yeah I think if the ASCII is invalid (and setting it fails), this same issue doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docs in 8b1dd3e:

image


Ok(())
}

fn update_api_key(&self, api_key: Option<String>) {
Expand Down Expand Up @@ -536,12 +557,32 @@ fn rpc_req<P: prost::Message + Default>(call: RpcCall) -> PyResult<tonic::Reques
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {err}")))?;
let mut req = tonic::Request::new(proto);
for (k, v) in call.metadata {
req.metadata_mut().insert(
MetadataKey::from_str(k.as_str())
.map_err(|err| PyValueError::new_err(format!("Invalid metadata key: {err}")))?,
v.parse()
.map_err(|err| PyValueError::new_err(format!("Invalid metadata value: {err}")))?,
);
if let Ok(binary_key) = BinaryMetadataKey::from_str(&k) {
let RpcMetadataValue::Bytes(bytes) = v else {
return Err(PyValueError::new_err(format!(
"Invalid metadata value for binary key {k}: expected bytes"
)));
};

req.metadata_mut()
.insert_bin(binary_key, BinaryMetadataValue::from_bytes(&bytes));
} else {
let ascii_key = AsciiMetadataKey::from_str(&k)
.map_err(|err| PyValueError::new_err(format!("Invalid metadata key: {err}")))?;

let RpcMetadataValue::Str(string) = v else {
return Err(PyValueError::new_err(format!(
"Invalid metadata value for ASCII key {k}: expected str"
)));
};

req.metadata_mut().insert(
ascii_key,
AsciiMetadataValue::from_str(&string).map_err(|err| {
PyValueError::new_err(format!("Invalid metadata value: {err}"))
})?,
);
}
}
if let Some(timeout_millis) = call.timeout_millis {
req.set_timeout(Duration::from_millis(timeout_millis));
Expand All @@ -568,11 +609,41 @@ where
}
}

fn partition_headers(
headers: HashMap<String, RpcMetadataValue>,
) -> (HashMap<String, String>, HashMap<String, Vec<u8>>) {
let (ascii_enum_headers, binary_enum_headers): (HashMap<_, _>, HashMap<_, _>) = headers
.into_iter()
.partition(|(_, v)| matches!(v, RpcMetadataValue::Str(_)));

let ascii_headers = ascii_enum_headers
.into_iter()
.map(|(k, v)| {
let RpcMetadataValue::Str(s) = v else {
unreachable!();
};
(k, s)
})
.collect();
let binary_headers = binary_enum_headers
.into_iter()
.map(|(k, v)| {
let RpcMetadataValue::Bytes(b) = v else {
unreachable!();
};
(k, b)
})
.collect();

(ascii_headers, binary_headers)
}

impl TryFrom<ClientConfig> for ClientOptions {
type Error = PyErr;

fn try_from(opts: ClientConfig) -> PyResult<Self> {
let mut gateway_opts = ClientOptionsBuilder::default();
let (ascii_headers, binary_headers) = partition_headers(opts.metadata);
gateway_opts
.target_url(
Url::parse(&opts.target_url)
Expand All @@ -587,7 +658,8 @@ impl TryFrom<ClientConfig> for ClientOptions {
)
.keep_alive(opts.keep_alive_config.map(Into::into))
.http_connect_proxy(opts.http_connect_proxy_config.map(Into::into))
.headers(Some(opts.metadata))
.headers(Some(ascii_headers))
.binary_headers(Some(binary_headers))
.api_key(opts.api_key);
// Builder does not allow us to set option here, so we have to make
// a conditional to even call it
Expand Down
Loading