Skip to content

Commit 714ef5a

Browse files
authored
Crude Read Support (#3)
Crude Read Support
2 parents 1d253a1 + 8a98d38 commit 714ef5a

File tree

10 files changed

+652
-293
lines changed

10 files changed

+652
-293
lines changed

Cargo.lock

Lines changed: 8 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,10 @@ futures = "0.1.27"
1010
tokio = "0.1.20"
1111
itertools = "0.8"
1212
failure = "0.1.5"
13+
serde = { version = "1.0.92", optional = true, features = ["derive"] }
14+
serde_json = { version = "1.0", optional = true }
15+
16+
[features]
17+
use-serde = ["serde", "serde_json"]
18+
19+
default = ["use-serde"]

src/client/mod.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use futures::{Future, Stream};
2+
use reqwest::r#async::{Client, Decoder};
3+
4+
use std::mem;
5+
6+
use crate::error::InfluxDbError;
7+
use crate::query::{InfluxDbQuery, QueryType};
8+
9+
/// Client which can read and write data from InfluxDB.
10+
///
11+
/// # Arguments
12+
///
13+
/// * `url`: The URL where InfluxDB is running (ex. `http://localhost:8086`).
14+
/// * `database`: The Database against which queries and writes will be run.
15+
///
16+
/// # Examples
17+
///
18+
/// ```rust
19+
/// use influxdb::client::InfluxDbClient;
20+
///
21+
/// let client = InfluxDbClient::new("http://localhost:8086", "test");
22+
///
23+
/// assert_eq!(client.database_name(), "test");
24+
/// ```
25+
pub struct InfluxDbClient {
26+
url: String,
27+
database: String,
28+
// auth: Option<InfluxDbAuthentication>
29+
}
30+
31+
impl InfluxDbClient {
32+
/// Instantiates a new [`InfluxDbClient`]
33+
///
34+
/// # Arguments
35+
///
36+
/// * `url`: The URL where InfluxDB is running (ex. `http://localhost:8086`).
37+
/// * `database`: The Database against which queries and writes will be run.
38+
///
39+
/// # Examples
40+
///
41+
/// ```rust
42+
/// use influxdb::client::InfluxDbClient;
43+
///
44+
/// let _client = InfluxDbClient::new("http://localhost:8086", "test");
45+
/// ```
46+
pub fn new<S>(url: S, database: S) -> Self
47+
where
48+
S: Into<String>,
49+
{
50+
InfluxDbClient {
51+
url: url.into(),
52+
database: database.into(),
53+
}
54+
}
55+
56+
pub fn database_name<'a>(&'a self) -> &'a str {
57+
&self.database
58+
}
59+
60+
pub fn database_url<'a>(&'a self) -> &'a str {
61+
&self.url
62+
}
63+
64+
pub fn ping(&self) -> impl Future<Item = (String, String), Error = InfluxDbError> {
65+
Client::new()
66+
.get(format!("{}/ping", self.url).as_str())
67+
.send()
68+
.map(|res| {
69+
let build = res
70+
.headers()
71+
.get("X-Influxdb-Build")
72+
.unwrap()
73+
.to_str()
74+
.unwrap();
75+
let version = res
76+
.headers()
77+
.get("X-Influxdb-Version")
78+
.unwrap()
79+
.to_str()
80+
.unwrap();
81+
82+
(String::from(build), String::from(version))
83+
})
84+
.map_err(|err| InfluxDbError::UnspecifiedError {
85+
error: format!("{}", err),
86+
})
87+
}
88+
89+
pub fn query<Q>(&self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
90+
where
91+
Q: InfluxDbQuery,
92+
{
93+
use futures::future;
94+
95+
let query_type = q.get_type();
96+
let endpoint = match query_type {
97+
QueryType::ReadQuery => "query",
98+
QueryType::WriteQuery => "write",
99+
};
100+
101+
let query = match q.build() {
102+
Err(err) => {
103+
let error = InfluxDbError::UnspecifiedError {
104+
error: format!("{}", err),
105+
};
106+
return Box::new(future::err::<String, InfluxDbError>(error));
107+
}
108+
Ok(query) => query,
109+
};
110+
111+
let query_str = query.get();
112+
let url_params = match query_type {
113+
QueryType::ReadQuery => format!("&q={}", query_str),
114+
QueryType::WriteQuery => String::from(""),
115+
};
116+
117+
let client = match query_type {
118+
QueryType::ReadQuery => Client::new().get(
119+
format!(
120+
"{url}/{endpoint}?db={db}{url_params}",
121+
url = self.url,
122+
endpoint = endpoint,
123+
db = self.database,
124+
url_params = url_params
125+
)
126+
.as_str(),
127+
),
128+
QueryType::WriteQuery => Client::new()
129+
.post(
130+
format!(
131+
"{url}/{endpoint}?db={db}",
132+
url = self.url,
133+
endpoint = endpoint,
134+
db = self.database,
135+
)
136+
.as_str(),
137+
)
138+
.body(query_str),
139+
};
140+
141+
Box::new(
142+
client
143+
.send()
144+
.and_then(|mut res| {
145+
let body = mem::replace(res.body_mut(), Decoder::empty());
146+
body.concat2()
147+
})
148+
.map_err(|err| InfluxDbError::UnspecifiedError {
149+
error: format!("{}", err),
150+
})
151+
.and_then(|body| {
152+
if let Ok(utf8) = std::str::from_utf8(&body) {
153+
let mut s = String::new();
154+
utf8.clone_into(&mut s);
155+
156+
// todo: improve error parsing without serde
157+
if s.contains("\"error\"") {
158+
return futures::future::err(InfluxDbError::UnspecifiedError {
159+
error: format!("influxdb error: \"{}\"", s),
160+
});
161+
}
162+
163+
return futures::future::ok(s);
164+
}
165+
166+
futures::future::err(InfluxDbError::UnspecifiedError {
167+
error: "some other error has happened here!".to_string(),
168+
})
169+
}),
170+
)
171+
}
172+
}

src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#[derive(Debug, Fail)]
2+
/// Errors that might happen in the crate
3+
pub enum InfluxDbError {
4+
#[fail(display = "query must contain at least one field")]
5+
/// Error happens when query has zero fields
6+
InvalidQueryError,
7+
8+
#[fail(
9+
display = "an error happened: \"{}\". this case should be handled better, please file an issue.",
10+
error
11+
)]
12+
/// todo: Error which is a placeholder for more meaningful errors. This should be refactored away.
13+
UnspecifiedError { error: String },
14+
15+
#[fail(display = "InfluxDB encountered the following error: {}", error)]
16+
/// Error which has happened inside InfluxDB
17+
DatabaseError { error: String },
18+
}

src/integrations/serde_integration.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use crate::client::InfluxDbClient;
2+
3+
use serde::de::DeserializeOwned;
4+
5+
use futures::{Future, Stream};
6+
use reqwest::r#async::{Client, Decoder};
7+
8+
use serde_json;
9+
use serde::Deserialize;
10+
use std::mem;
11+
12+
use crate::error::InfluxDbError;
13+
use crate::query::{InfluxDbQuery, QueryType};
14+
15+
#[derive(Deserialize)]
16+
#[doc(hidden)]
17+
struct _DatabaseError {
18+
error: String,
19+
}
20+
21+
#[derive(Deserialize, Debug)]
22+
#[doc(hidden)]
23+
pub struct DatabaseQueryResult<T> {
24+
pub results: Vec<InfluxDbReturn<T>>,
25+
}
26+
27+
#[derive(Deserialize, Debug)]
28+
#[doc(hidden)]
29+
pub struct InfluxDbReturn<T> {
30+
pub series: Option<Vec<InfluxDbSeries<T>>>,
31+
}
32+
33+
#[derive(Deserialize, Debug)]
34+
#[doc(hidden)]
35+
pub struct InfluxDbSeries<T> {
36+
pub name: String,
37+
pub values: Vec<T>,
38+
}
39+
40+
impl InfluxDbClient {
41+
pub fn json_query<T: 'static, Q>(&self, q: Q) -> Box<dyn Future<Item = Option<Vec<InfluxDbSeries<T>>>, Error = InfluxDbError>>
42+
where
43+
Q: InfluxDbQuery,
44+
T: DeserializeOwned,
45+
{
46+
use futures::future;
47+
48+
let query_type = q.get_type();
49+
let endpoint = match query_type {
50+
QueryType::ReadQuery => "query",
51+
QueryType::WriteQuery => "write",
52+
};
53+
54+
let query = match q.build() {
55+
Err(err) => {
56+
let error = InfluxDbError::UnspecifiedError {
57+
error: format!("{}", err),
58+
};
59+
return Box::new(future::err::<Option<Vec<InfluxDbSeries<T>>>, InfluxDbError>(error));
60+
}
61+
Ok(query) => query,
62+
};
63+
64+
let query_str = query.get();
65+
let url_params = match query_type {
66+
QueryType::ReadQuery => format!("&q={}", query_str),
67+
QueryType::WriteQuery => String::from(""),
68+
};
69+
70+
let client = match query_type {
71+
QueryType::ReadQuery => Client::new().get(
72+
format!(
73+
"{url}/{endpoint}?db={db}{url_params}",
74+
url = self.database_url(),
75+
endpoint = endpoint,
76+
db = self.database_name(),
77+
url_params = url_params
78+
)
79+
.as_str(),
80+
),
81+
QueryType::WriteQuery => Client::new()
82+
.post(
83+
format!(
84+
"{url}/{endpoint}?db={db}",
85+
url = self.database_url(),
86+
endpoint = endpoint,
87+
db = self.database_name(),
88+
)
89+
.as_str(),
90+
)
91+
.body(query_str),
92+
};
93+
94+
Box::new(
95+
client
96+
.send()
97+
.and_then(|mut res| {
98+
let body = mem::replace(res.body_mut(), Decoder::empty());
99+
body.concat2()
100+
})
101+
.map_err(|err| InfluxDbError::UnspecifiedError {
102+
error: format!("{}", err)
103+
})
104+
.and_then(|body| {
105+
// Try parsing InfluxDBs { "error": "error message here" }
106+
if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) {
107+
return futures::future::err(InfluxDbError::DatabaseError {
108+
error: error.error.to_string()
109+
})
110+
} else {
111+
// Json has another structure, let's try actually parsing it to the type we're deserializing
112+
let from_slice = serde_json::from_slice::<DatabaseQueryResult<T>>(&body);
113+
114+
let mut deserialized = match from_slice {
115+
Ok(deserialized) => deserialized,
116+
Err(err) => return futures::future::err(InfluxDbError::UnspecifiedError {
117+
error: format!("serde error: {}", err)
118+
})
119+
};
120+
121+
return futures::future::result(Ok(deserialized.results.remove(0).series));
122+
}
123+
})
124+
)
125+
}
126+
}

0 commit comments

Comments
 (0)