Skip to content

Commit 78df5a3

Browse files
committed
Updates
1 parent aebbf46 commit 78df5a3

File tree

5 files changed

+64
-21
lines changed

5 files changed

+64
-21
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ futures = "0.1.27"
1010
tokio = "0.1.20"
1111
itertools = "0.8"
1212
failure = "0.1.5"
13-
serde = { version = "1.0.92", optional = true }
13+
serde = { version = "1.0.92", optional = true, features = ["derive"] }
1414
serde_json = { version = "1.0", optional = true }
1515

1616
[features]
17-
serde-orm = ["serde", "serde_json"]
17+
use-serde = ["serde", "serde_json"]
1818

19-
default = ["serde-orm"]
19+
default = ["use-serde"]

src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl InfluxDbClient {
8686
})
8787
}
8888

89-
pub fn query<Q>(self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
89+
pub fn query<Q>(&self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
9090
where
9191
Q: InfluxDbQuery,
9292
{

src/integrations/serde_integration.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,27 @@ struct _DatabaseError {
1818
error: String,
1919
}
2020

21-
pub trait InfluxDbSerdeORM {
22-
fn json_query<T: 'static, Q>(self, q: Q) -> Box<dyn Future<Item = T, Error = InfluxDbError>> where
23-
Q: InfluxDbQuery,
24-
T: DeserializeOwned;
21+
#[derive(Deserialize, Debug)]
22+
#[doc(hidden)]
23+
pub struct DatabaseQueryResult<T> {
24+
pub results: Vec<InfluxDbReturn<T>>,
2525
}
2626

27-
impl InfluxDbSerdeORM for InfluxDbClient {
28-
fn json_query<T: 'static, Q>(self, q: Q) -> Box<dyn Future<Item = T, Error = InfluxDbError>>
27+
#[derive(Deserialize, Debug)]
28+
#[doc(hidden)]
29+
pub struct InfluxDbReturn<T> {
30+
pub series: Option<Vec<InfluxDbSeries<T>>>,
31+
}
32+
33+
#[derive(Deserialize, Debug)]
34+
#[doc(hidden)]
35+
pub struct InfluxDbSeries<T> {
36+
pub name: String,
37+
pub values: Vec<T>,
38+
}
39+
40+
impl InfluxDbClient {
41+
pub fn json_query<T: 'static, Q>(self, q: Q) -> Box<dyn Future<Item = Option<Vec<T>>, Error = InfluxDbError>>
2942
where
3043
Q: InfluxDbQuery,
3144
T: DeserializeOwned,
@@ -43,7 +56,7 @@ impl InfluxDbSerdeORM for InfluxDbClient {
4356
let error = InfluxDbError::UnspecifiedError {
4457
error: format!("{}", err),
4558
};
46-
return Box::new(future::err::<T, InfluxDbError>(error));
59+
return Box::new(future::err::<Option<Vec<T>>, InfluxDbError>(error));
4760
}
4861
Ok(query) => query,
4962
};
@@ -89,18 +102,28 @@ impl InfluxDbSerdeORM for InfluxDbClient {
89102
error: format!("{}", err)
90103
})
91104
.and_then(|body| {
105+
println!("{:?}", &body);
92106
// Try parsing InfluxDBs { "error": "error message here" }
93107
if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) {
94108
return futures::future::err(InfluxDbError::DatabaseError {
95109
error: error.error.to_string()
96110
})
97-
} else if let Ok(t_result) = serde_json::from_slice::<T>(&body) {
98-
// Json has another structure, let's try actually parsing it to the type we're deserializing to
99-
return futures::future::result(Ok(t_result));
100111
} else {
101-
return futures::future::err(InfluxDbError::UnspecifiedError {
102-
error: "something wen't wrong during deserializsation of the database response. this might be a bug!".to_string()
103-
})
112+
let from_slice = serde_json::from_slice::<DatabaseQueryResult<T>>(&body);
113+
114+
let mut deserialized = match from_slice {
115+
Ok(deserialized) => deserialized,
116+
Err(err) => return futures::future::err(InfluxDbError::UnspecifiedError {
117+
error: format!("serde error: {}", err)
118+
})
119+
};
120+
121+
// Json has another structure, let's try actually parsing it to the type we're deserializing to
122+
let t_result = match deserialized.results.remove(0).series {
123+
Some(series) => Ok(Some(series.into_iter().flat_map(|x| { x.values }).collect::<Vec<T>>())),
124+
None => Ok(None)
125+
};
126+
return futures::future::result(t_result);
104127
}
105128
})
106129
)

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ pub mod client;
88
pub mod error;
99
pub mod query;
1010

11+
#[cfg(feature = "use-serde")]
1112
pub mod integrations {
12-
#[cfg(feature = "serde-orm")]
13+
#[cfg(feature = "use-serde")]
1314
pub mod serde_integration;
1415
}
1516

tests/integration_tests.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,26 @@ fn test_read() {
4747
let client = create_client();
4848
let query = InfluxDbQuery::raw_read_query("SELECT * FROM weather");
4949
let result = get_runtime().block_on(client.query(query));
50-
println!("{:?}", result);
5150
assert!(result.is_ok(), "Should be no error");
52-
println!("{}", result.unwrap());
53-
}
51+
}
52+
53+
#[test]
54+
#[cfg(feature = "use-serde")]
55+
/// INTEGRATION TEST
56+
///
57+
/// This test case tests whether the InfluxDB server can be connected to and gathers info about it
58+
fn test_json() {
59+
use serde::Deserialize;
60+
61+
#[derive(Deserialize, Debug)]
62+
struct Weather {
63+
time: String,
64+
temperature: i32,
65+
}
66+
67+
let client = create_client();
68+
let query = InfluxDbQuery::raw_read_query("SELECT * FROM weather");
69+
let result = get_runtime().block_on(client.json_query::<Weather, _>(query));
70+
71+
assert!(result.is_ok(), "We could read from the DB");
72+
}

0 commit comments

Comments
 (0)