Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ async fn main() -> anyhow::Result<()> {
.default_backoff()
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::labels.combine(predicates::annotations));
.predicate_filter(
predicates::labels.combine(predicates::annotations),
Default::default(),
);
let mut stream = pin!(stream);

// Periodically read our state in the background
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
})
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version);
.predicate_filter(predicates::resource_version, Default::default());
let mut stream = pin!(stream);

while let Some(pod) = stream.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
let filtered = subscriber
.clone()
.map(|r| Ok(r.deref().clone()))
.predicate_filter(predicates::resource_version)
.predicate_filter(predicates::resource_version, Default::default())
.filter_map(|r| future::ready(r.ok().map(Arc::new)));

// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
Expand Down
6 changes: 3 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ where
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// .predicate_filter(predicates::generation);
/// .predicate_filter(predicates::generation, Default::default());
///
/// Controller::for_stream(deploys, reader)
/// .run(reconcile, error_policy, Arc::new(()))
Expand Down Expand Up @@ -993,7 +993,7 @@ where
/// # async fn doc(client: kube::Client) {
/// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
/// .touched_objects()
/// .predicate_filter(predicates::generation);
/// .predicate_filter(predicates::generation, Default::default());
///
/// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
/// .owns_stream(sts_stream)
Expand Down Expand Up @@ -1271,7 +1271,7 @@ where
/// let cr: Api<CustomResource> = Api::all(client.clone());
/// let daemons = watcher(api, watcher::Config::default())
/// .touched_objects()
/// .predicate_filter(predicates::generation);
/// .predicate_filter(predicates::generation, Default::default());
///
/// Controller::new(cr, watcher::Config::default())
/// .watches_stream(daemons, mapper)
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
pub use watcher::{metadata_watcher, watcher};

pub use utils::{predicates, Predicate};
pub use utils::{predicates, Predicate, PredicateConfig};
pub use wait::conditions;
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod watch_ext;
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
pub use event_decode::EventDecode;
pub use event_modify::EventModify;
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use predicate::{predicates, Config as PredicateConfig, Predicate, PredicateFilter};
pub use reflect::Reflect;
pub use stream_backoff::StreamBackoff;
pub use watch_ext::WatchStreamExt;
Expand Down
135 changes: 128 additions & 7 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
marker::PhantomData,
time::{Duration, Instant},
};

fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
Expand Down Expand Up @@ -114,6 +115,44 @@ where
}
}

/// Configuration for predicate filtering with cache TTL
#[derive(Debug, Clone)]
pub struct Config {
/// Time-to-live for cache entries
///
/// Entries not seen for at least this long will be evicted from the cache.
/// Default is 1 hour.
ttl: Duration,
}

impl Config {
/// Set the time-to-live for cache entries
///
/// Entries not seen for at least this long will be evicted from the cache.
#[must_use]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
}

impl Default for Config {
fn default() -> Self {
Self {
// Default to 1 hour TTL - long enough to avoid unnecessary reconciles
// but short enough to prevent unbounded memory growth
Comment on lines +142 to +143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit uncertain about this value still. Because we bump the interval every time we see an object, if we see the object again during relists every 5m, then this setup (and value) works great because we will in practice never see the same object again unless the hash/uid changes.

However, I am noticing some differences in behaviour with long watches with and without using .streaming_lists(), and would like to confirm whether the behavior is correct there first.

Copy link
Contributor Author

@doxxx93 doxxx93 Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. Since we update 'last_seen' on every encounter, objects that reappear during periodic relists will keep their timestamps fresh and won't expire.

Regarding the ' streaming_lists()' behavior - are you planning to verify this yourself, or would you like me to test both strategies (ListWatch vs StreamingList) to see how frequently objects are re-encountered? I can adjust the default TTL based on the findings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you'd like me to test this, I'm thinking of something like:

// Track when we see each pod to measure relist frequency
let mut seen_times: HashMap<String, Vec<Instant>> = HashMap::new();

watcher(api, watcher_config)
    .applied_objects()
    .predicate_filter(predicates::generation, Default::default())
    .try_for_each(|p| async {
        let name = p.name_any();
        let now = Instant::now();

        if let Some(last_seen) = seen_times.get(&name).and_then(|v| v.last()) {
            let elapsed = now.duration_since(*last_seen);
            info!("Pod {} re-encountered after {:?}", name, elapsed);
        }
        seen_times.entry(name).or_insert(vec![]).push(now);
        Ok(())
    })
    .await?;

Run this for ~10 minutes with both Config::default() (ListWatch) and Config::default().streaming_lists() to compare how often objects are re-sent.

Expected behavior (my assumption):

  • ListWatch: Objects re-appear during periodic relists (every ~5min?) → last_seen keeps updating → 1 hour TTL means entries effectively never expire for active resources
  • StreamingList: Objects might only appear once unless changed → last_seen doesn't update → TTL actually matters for eviction

Though I'm not entirely certain about StreamingList's relist behavior, so please let me know if this test approach makes sense or if you have better insights on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was going to have a look at it. My approach is similar. I don't think we even need to include predicates to see it. Running this modified example of pod_watcher;

use futures::prelude::*;
use kube::{
    Client,
    api::{Api, ResourceExt},
    runtime::{WatchStreamExt, watcher},
};
use tracing::*;

type X = k8s_openapi::api::networking::v1::Ingress;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();
    let client = Client::try_default().await?;
    let api = Api::<X>::default_namespaced(client);
    let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
    let wc = if use_watchlist {
        // requires WatchList feature gate on 1.27 or later
        watcher::Config::default().streaming_lists()
    } else {
        watcher::Config::default()
    };

    watcher(api, wc)
        .applied_objects()
        .default_backoff()
        .try_for_each(|p| async move {
            info!("saw {}", p.name_any());
            Ok(())
        })
        .await?;
    Ok(())
}

and seeing re-updates from streaming_lists;

WATCHLIST=1 cargo run --example pod_watcher
    Blocking waiting for file lock on build directory
   Compiling kube-examples v2.0.1 (/home/clux/kube/kube/examples)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 3.44s
     Running `/home/clux/kube/kube/target/debug/examples/pod_watcher`
2025-10-25T19:51:33.102534Z  INFO pod_watcher: saw five-e
2025-10-25T19:56:23.104634Z  INFO pod_watcher: saw five-e
2025-10-25T20:01:13.106317Z  INFO pod_watcher: saw five-e
2025-10-25T20:06:03.107981Z  INFO pod_watcher: saw five-e

but NOT for the old way;

cargo run --example pod_watcher
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.12s
     Running `/home/clux/kube/kube/target/debug/examples/pod_watcher`
2025-10-25T20:06:58.630438Z  INFO pod_watcher: saw five-e
... no further updates

unfortunately, i am not sure if this is a bug yet.

ttl: Duration::from_secs(3600),
}
}
}

/// Cache entry storing predicate hash and last access time
#[derive(Debug, Clone)]
struct CacheEntry {
hash: u64,
last_seen: Instant,
}

#[allow(clippy::pedantic)]
#[pin_project]
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
Expand All @@ -122,7 +161,8 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<PredicateCacheKey, u64>,
cache: HashMap<PredicateCacheKey, CacheEntry>,
config: Config,
// K: Resource necessary to get .meta() of an object during polling
_phantom: PhantomData<K>,
}
Expand All @@ -132,11 +172,12 @@ where
K: Resource,
P: Predicate<K>,
{
pub(super) fn new(stream: St, predicate: P) -> Self {
pub(super) fn new(stream: St, predicate: P, config: Config) -> Self {
Self {
stream,
predicate,
cache: HashMap::new(),
config,
_phantom: PhantomData,
}
}
Expand All @@ -152,13 +193,29 @@ where

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();

// Evict expired entries before processing new events
let now = Instant::now();
let ttl = me.config.ttl;
me.cache
.retain(|_, entry| now.duration_since(entry.last_seen) < ttl);

Poll::Ready(loop {
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = PredicateCacheKey::from(obj.meta());
let changed = me.cache.get(&key) != Some(&val);
me.cache.insert(key, val);
let now = Instant::now();

// Check if the predicate value changed or entry doesn't exist
let changed = me.cache.get(&key).map(|entry| entry.hash) != Some(val);

// Upsert the cache entry with new hash and timestamp
me.cache.insert(key, CacheEntry {
hash: val,
last_seen: now,
});

if changed {
Some(Ok(obj))
} else {
Expand Down Expand Up @@ -216,7 +273,7 @@ pub mod predicates {
pub(crate) mod tests {
use std::{pin::pin, task::Poll};

use super::{predicates, Error, PredicateFilter};
use super::{predicates, Config, Error, PredicateFilter};
use futures::{poll, stream, FutureExt, StreamExt};
use kube_client::Resource;
use serde_json::json;
Expand Down Expand Up @@ -248,7 +305,11 @@ pub(crate) mod tests {
Ok(mkobj(1)),
Ok(mkobj(2)),
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
let mut rx = pin!(PredicateFilter::new(
data,
predicates::generation,
Config::default()
));

// mkobj(1) passed through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
Expand Down Expand Up @@ -299,7 +360,11 @@ pub(crate) mod tests {
Ok(mkobj(1, "uid-2")),
Ok(mkobj(2, "uid-3")),
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
let mut rx = pin!(PredicateFilter::new(
data,
predicates::generation,
Config::default()
));

// mkobj(1, uid-1) passed through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
Expand All @@ -319,4 +384,60 @@ pub(crate) mod tests {

assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}

#[tokio::test]
async fn predicate_cache_ttl_evicts_expired_entries() {
use futures::{channel::mpsc, SinkExt};
use k8s_openapi::api::core::v1::Pod;
use std::time::Duration;

let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};

// Use a very short TTL for testing
let config = Config::default().ttl(Duration::from_millis(50));

// Use a channel so we can send items with delays
let (mut tx, rx) = mpsc::unbounded();
let mut filtered = pin!(PredicateFilter::new(
rx.map(Ok::<_, Error>),
predicates::generation,
config
));

// Send first object
tx.send(mkobj(1, "uid-1")).await.unwrap();
let first = filtered.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));

// Send same object immediately - should be filtered
tx.send(mkobj(1, "uid-1")).await.unwrap();
assert!(matches!(poll!(filtered.next()), Poll::Pending));

// Wait for TTL to expire
tokio::time::sleep(Duration::from_millis(100)).await;

// Send same object after TTL - should pass through due to eviction
tx.send(mkobj(1, "uid-1")).await.unwrap();
let second = filtered.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
}
}
13 changes: 8 additions & 5 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
utils::{
event_decode::EventDecode,
event_modify::EventModify,
predicate::{Predicate, PredicateFilter},
predicate::{Config as PredicateConfig, Predicate, PredicateFilter},
stream_backoff::StreamBackoff,
},
watcher,
Expand Down Expand Up @@ -99,6 +99,9 @@ pub trait WatchStreamExt: Stream {
/// Common use case for this is to avoid repeat events for status updates
/// by filtering on [`predicates::generation`](crate::predicates::generation).
///
/// The cache entries have a configurable time-to-live (TTL) to prevent unbounded
/// memory growth. By default, entries expire after 1 hour.
///
/// ## Usage
/// ```no_run
/// # use std::pin::pin;
Expand All @@ -111,21 +114,21 @@ pub trait WatchStreamExt: Stream {
/// let deploys: Api<Deployment> = Api::default_namespaced(client);
/// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
/// .applied_objects()
/// .predicate_filter(predicates::generation));
/// .predicate_filter(predicates::generation, Default::default()));
///
/// while let Some(d) = changed_deploys.try_next().await? {
/// println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
/// }
/// # Ok(())
/// # }
/// ```
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
fn predicate_filter<K, P>(self, predicate: P, config: PredicateConfig) -> PredicateFilter<Self, K, P>
where
Self: Stream<Item = Result<K, watcher::Error>> + Sized,
K: Resource + 'static,
P: Predicate<K> + 'static,
{
PredicateFilter::new(self, predicate)
PredicateFilter::new(self, predicate, config)
}

/// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
Expand Down Expand Up @@ -301,7 +304,7 @@ pub(crate) mod tests {
fn test_watcher_stream_type_drift() {
let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
.touched_objects()
.predicate_filter(predicates::generation)
.predicate_filter(predicates::generation, Default::default())
.boxed();
assert_stream(pred_watch);
}
Expand Down
Loading