Skip to content

Commit ef47887

Browse files
committed
expose list/watch parameters - fixes ynqa#11
1 parent 0106fe8 commit ef47887

12 files changed

+287
-156
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
0.7.0 / UNRELEASED
2+
==================
3+
* Expose list/watch parameters #11
4+
5+
16
0.6.0 / 2019-05-12
27
==================
38
* Expose getter `Informer::version`

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ To use it, you just feed in `T` as a `Spec` struct and `U` as a `Status` struct,
2525
```rust
2626
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
2727
let resource = ResourceType::Pods(Some("kube-system".into()));
28-
let rf : Reflector<PodSpec, PodStatus> = Reflector::new(client.clone(), resource.into())?;
28+
let rf : Reflector<PodSpec, PodStatus> = Reflector::new(client.clone(), resource.into())?
29+
.timeout(10)
30+
.init();
2931
```
3032

3133
then you can `poll()` the reflector, and `read()` to get the current cached state:
@@ -53,7 +55,8 @@ You tell it what type parameters correspond to; `T` should be a `Spec` struct, a
5355
```rust
5456
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
5557
let resource = ResourceType::Pods(Some("kube-system".into()));
56-
let inf : Informer<PodSpec, PodStatus> = Informer::new(client.clone(), resource.into())?;
58+
let inf : Informer<PodSpec, PodStatus> = Informer::new(client.clone(), resource.into())
59+
.init()?;
5760
```
5861

5962
The main feature of `Informer<T, U>` is that after calling `.poll()` you handle the events and decide what to do with them yourself:
@@ -120,7 +123,7 @@ cargo run --example crd_reflector
120123
then you can `kubectl apply -f crd-baz.yaml -n kube-system`, or `kubectl delete -f crd-baz.yaml -n kube-system`, or `kubectl edit foos baz -n kube-system` to verify that the events are being picked up.
121124

122125
## Timing
123-
All watch calls have timeouts set to `10` seconds as a default (and kube always waits that long regardless of activity). If you like to hammer the API less, call `.poll()` less often.
126+
All watch calls have timeouts set to `10` seconds as a default (and kube always waits that long regardless of activity). If you like to hammer the API less, you can call `.poll()` less often. But the kube api holds for the full timeout value anyway and you can set it with `.timeout(n)`.
124127

125128
## License
126129
Apache 2.0 licensed. See LICENSE for details.

examples/crd_reflector.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ fn main() -> Result<(), failure::Error> {
2828
namespace: Some("kube-system".into()),
2929
..Default::default()
3030
};
31-
let rf : Reflector<FooResource, Void> = Reflector::new(client, resource)?;
31+
let rf : Reflector<FooResource, Void> = Reflector::new(client, resource)
32+
.init()?;
3233

3334
loop {
3435
// Update internal state by calling watch (blocks):

examples/deployment_reflector.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ fn main() -> Result<(), failure::Error> {
1313
let client = APIClient::new(config);
1414

1515
let resource = ResourceType::Deploys(Some("kube-system".into()));
16-
let rf : Reflector<DeploymentSpec, DeploymentStatus> = Reflector::new(client, resource.into())?;
16+
let rf : Reflector<DeploymentSpec, DeploymentStatus> =
17+
Reflector::new(client, resource.into())
18+
.init()?;
1719

1820
// rf is initialized with full state, which can be extracted on demand.
1921
// Output is Map of name -> Deployment

examples/node_informer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ fn main() -> Result<(), failure::Error> {
1616
let client = APIClient::new(config);
1717

1818
let nodes = ResourceType::Nodes;
19-
let ni : Informer<NodeSpec, NodeStatus> = Informer::new(client.clone(), nodes.into())?;
19+
let ni = Informer::new(client.clone(), nodes.into())
20+
.labels("role=worker")
21+
.init()?;
2022

2123
loop {
2224
ni.poll()?;
@@ -34,6 +36,7 @@ fn handle_nodes(client: &APIClient, ev: WatchEvent<NodeSpec, NodeStatus>) -> Res
3436
info!("New Node: {}", o.spec.provider_id.unwrap());
3537
},
3638
WatchEvent::Modified(o) => {
39+
// Nodes often modify a lot - only print broken nodes
3740
if let Some(true) = o.spec.unschedulable {
3841
let failed = o.status.conditions.unwrap().into_iter().filter(|c| {
3942
// In a failed state either some of the extra conditions are not False
@@ -51,6 +54,9 @@ fn handle_nodes(client: &APIClient, ev: WatchEvent<NodeSpec, NodeStatus>) -> Res
5154
let req = Event::list_event_for_all_namespaces(opts)?.0;
5255
let res = client.request::<Event>(req)?;
5356
warn!("Node events: {:?}", res);
57+
} else {
58+
// Turn up logging above to see
59+
debug!("Normal node: {}", o.metadata.name);
5460
}
5561
},
5662
WatchEvent::Deleted(o) => {

examples/node_reflector.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ fn main() -> Result<(), failure::Error> {
1313
let client = APIClient::new(config);
1414

1515
let resource = ResourceType::Nodes;
16-
let rf : Reflector<NodeSpec, NodeStatus> = Reflector::new(client, resource.into())?;
16+
let rf : Reflector<NodeSpec, NodeStatus> = Reflector::new(client, resource.into())
17+
.labels("role=master")
18+
.init()?;
1719

1820
// rf is initialized with full state, which can be extracted on demand.
1921
// Output is Map of name -> Node

examples/pod_informer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ fn main() -> Result<(), failure::Error> {
1515
let namespace = Some(env::var("NAMESPACE").unwrap_or("kube-system".into()));
1616

1717
let resource = ResourceType::Pods(namespace);
18-
let inf : Informer<PodSpec, PodStatus> = Informer::new(client.clone(), resource.into())?;
18+
let inf : Informer<PodSpec, PodStatus> = Informer::new(client.clone(), resource.into())
19+
.init()?;
1920

2021
// Here we both poll and reconcile based on events from the main thread
2122
// If you run this next to actix-web (say), spawn a thread and pass `inf` as app state

examples/pod_reflector.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ fn main() -> Result<(), failure::Error> {
1313
let client = APIClient::new(config);
1414

1515
let resource = ResourceType::Pods(Some("kube-system".into()));
16-
let rf : Reflector<PodSpec, PodStatus> = Reflector::new(client.clone(), resource.into())?;
16+
let rf : Reflector<PodSpec, PodStatus> = Reflector::new(client.clone(), resource.into())
17+
.init()?;
1718

1819
// Can read initial state now:
1920
rf.read()?.into_iter().for_each(|(name, p)| {

src/api/informer.rs

Lines changed: 89 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::api::resource::{
22
ResourceList,
33
WatchEvent,
44
ApiResource,
5+
QueryParams,
56
};
67
use crate::client::APIClient;
78
use crate::{Result};
@@ -31,48 +32,85 @@ pub struct Informer<T, U> where
3132
version: Arc<RwLock<String>>,
3233
client: APIClient,
3334
resource: ApiResource,
35+
params: QueryParams,
3436
}
3537

3638
impl<T, U> Informer<T, U> where
3739
T: Clone + DeserializeOwned,
3840
U: Clone + DeserializeOwned,
3941
{
4042
/// Create a reflector with a kube client on a kube resource
41-
///
42-
/// Initializes resourceVersion with a 1 limit list call
43-
pub fn new(client: APIClient, r: ApiResource) -> Result<Self> {
44-
info!("Creating Informer for {:?}", r);
45-
let initial = get_resource_version(&client, &r)?;
46-
Ok(Informer {
43+
pub fn new(client: APIClient, r: ApiResource) -> Self {
44+
Informer {
4745
client,
4846
resource: r,
47+
params: QueryParams::default(),
4948
events: Arc::new(RwLock::new(VecDeque::new())),
50-
version: Arc::new(RwLock::new(initial)),
51-
})
49+
version: Arc::new(RwLock::new(0.to_string())),
50+
}
5251
}
5352

54-
/// Create a reflector with a kube client on a kube resource
53+
/// Configure the timeout for the list/watch call.
5554
///
56-
/// Initializes resourceVersion from a passed in value
57-
pub fn from_version(client: APIClient, r: ApiResource, v: String) -> Result<Self> {
58-
info!("Creating Informer for {:?}", r);
59-
Ok(Informer {
60-
client,
61-
resource: r,
62-
events: Arc::new(RwLock::new(VecDeque::new())),
63-
version: Arc::new(RwLock::new(v)),
64-
})
55+
/// This limits the duration of the call, regardless of any activity or inactivity.
56+
/// Defaults to 10s
57+
pub fn timeout(mut self, timeout_secs: u32) -> Self {
58+
self.params.timeout = Some(timeout_secs);
59+
self
60+
}
61+
62+
/// Configure the selector to restrict the list of returned objects by their fields.
63+
///
64+
/// Defaults to everything.
65+
/// Supports '=', '==', and '!=', and can comma separate: key1=value1,key2=value2
66+
/// The server only supports a limited number of field queries per type.
67+
pub fn fields(mut self, field_selector: &str) -> Self {
68+
self.params.field_selector = Some(field_selector.to_string());
69+
self
70+
}
71+
72+
/// Configure the selector to restrict the list of returned objects by their labels.
73+
///
74+
/// Defaults to everything.
75+
/// Supports '=', '==', and '!=', and can comma separate: key1=value1,key2=value2
76+
pub fn labels(mut self, label_selector: &str) -> Self {
77+
self.params.label_selector = Some(label_selector.to_string());
78+
self
79+
}
80+
81+
/// If called, partially initialized resources are included in watch/list responses.
82+
pub fn include_uninitialized(mut self) -> Self {
83+
self.params.include_uninitialized = true;
84+
self
85+
}
86+
87+
88+
/// Initialize without a prior version
89+
///
90+
/// Will seed resourceVersion with a 1 limit list call to the resource
91+
pub fn init(self) -> Result<Self> {
92+
let initial = self.get_resource_version()?;
93+
info!("Starting Informer for {:?}", self.resource);
94+
*self.version.write().unwrap() = initial;
95+
Ok(self)
96+
}
97+
98+
/// Initialize from a prior version
99+
pub fn init_from(self, v: String) -> Self {
100+
info!("Recreating Informer for {:?} at {}", self.resource, v);
101+
*self.version.write().unwrap() = v;
102+
self
65103
}
66104

105+
67106
/// Run a single watch poll
68107
///
69108
/// If this returns an error, it resets the resourceVersion.
70109
/// This is meant to be run continually and events are meant to be handled between.
71110
/// If handling all the events is too time consuming, you probably need a queue.
72111
pub fn poll(&self) -> Result<()> {
73112
trace!("Watching {:?}", self.resource);
74-
let oldver = self.version();
75-
match watch_for_resource_updates(&self.client, &self.resource, &oldver) {
113+
match self.single_watch() {
76114
Ok((events, newver)) => {
77115
*self.version.write().unwrap() = newver;
78116
for e in events {
@@ -97,7 +135,7 @@ impl<T, U> Informer<T, U> where
97135
/// Reset the resourceVersion to current and clear the event queue
98136
pub fn reset(&self) -> Result<()> {
99137
// Fetch a new initial version:
100-
let initial = get_resource_version(&self.client, &self.resource)?;
138+
let initial = self.get_resource_version()?;
101139
*self.version.write().unwrap() = initial;
102140
self.events.write().unwrap().clear();
103141
Ok(())
@@ -107,42 +145,40 @@ impl<T, U> Informer<T, U> where
107145
pub fn version(&self) -> String {
108146
self.version.read().unwrap().clone()
109147
}
110-
}
111148

112-
fn get_resource_version(client: &APIClient, rg: &ApiResource) -> Result<String>
113-
{
114-
let req = rg.list_zero_resource_entries()?;
115149

116-
// parse to void a ResourceList into void except for Metadata
117-
#[derive(Clone, Deserialize)]
118-
struct Discard {} // ffs
119-
let res = client.request::<ResourceList<Option<Discard>>>(req)?;
150+
/// Init helper
151+
fn get_resource_version(&self) -> Result<String> {
152+
let req = self.resource.list_zero_resource_entries(&self.params)?;
120153

121-
let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into());
122-
debug!("Got fresh resourceVersion={} for {}", version, rg.resource);
123-
Ok( version )
124-
}
154+
// parse to void a ResourceList into void except for Metadata
155+
#[derive(Clone, Deserialize)]
156+
struct Discard {} // ffs
157+
let res = self.client.request::<ResourceList<Option<Discard>>>(req)?;
125158

159+
let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into());
160+
debug!("Got fresh resourceVersion={} for {}", version, self.resource.resource);
161+
Ok( version )
162+
}
126163

127-
fn watch_for_resource_updates<T, U>(client: &APIClient, rg: &ApiResource, ver: &str)
128-
-> Result<(Vec<WatchEvent<T, U>>, String)> where
129-
T: Clone + DeserializeOwned,
130-
U: Clone + DeserializeOwned,
131-
{
132-
let req = rg.watch_resource_entries_after(ver)?;
133-
let events = client.request_events::<WatchEvent<T, U>>(req)?;
134-
135-
// Follow docs conventions and store the last resourceVersion
136-
// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
137-
let newver = events.iter().filter_map(|e| {
138-
match e {
139-
WatchEvent::Added(o) => o.metadata.resourceVersion.clone(),
140-
WatchEvent::Modified(o) => o.metadata.resourceVersion.clone(),
141-
WatchEvent::Deleted(o) => o.metadata.resourceVersion.clone(),
142-
_ => None
143-
}
144-
}).last().unwrap_or_else(|| ver.into());
145-
debug!("Got {} {} events, resourceVersion={}", events.len(), rg.resource, newver);
164+
/// Watch helper
165+
fn single_watch(&self) -> Result<(Vec<WatchEvent<T, U>>, String)> {
166+
let oldver = self.version();
167+
let req = self.resource.watch_resource_entries_after(&self.params, &oldver)?;
168+
let events = self.client.request_events::<WatchEvent<T, U>>(req)?;
169+
170+
// Follow docs conventions and store the last resourceVersion
171+
// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
172+
let newver = events.iter().filter_map(|e| {
173+
match e {
174+
WatchEvent::Added(o) => o.metadata.resourceVersion.clone(),
175+
WatchEvent::Modified(o) => o.metadata.resourceVersion.clone(),
176+
WatchEvent::Deleted(o) => o.metadata.resourceVersion.clone(),
177+
_ => None
178+
}
179+
}).last().unwrap_or_else(|| oldver.into());
180+
debug!("Got {} {} events, resourceVersion={}", events.len(), self.resource.resource, newver);
146181

147-
Ok((events, newver))
182+
Ok((events, newver))
183+
}
148184
}

src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub use self::resource::{
1919
Resource,
2020
ApiResource,
2121
ResourceType,
22+
QueryParams,
2223
WatchEvent,
2324
ApiError,
2425
};

0 commit comments

Comments
 (0)