Skip to content

Commit f983188

Browse files
author
Ignacio Corderi
committed
Added GetLog command and KineticError implements Error
1 parent 94afd57 commit f983188

File tree

8 files changed

+791
-28
lines changed

8 files changed

+791
-28
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22

33
name = "kinetic-rust"
4-
version = "0.0.8"
4+
version = "0.0.9"
55
authors = ["Ignacio Corderi <[email protected]>"]
66
homepage = "https://github.com/icorderi/kinetic-rust/"
77
repository = "https://github.com/icorderi/kinetic-rust/"

src/kinetic-bench/main.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ static USAGE: &'static str = "
4040
Kinetic from Rust!
4141
4242
Usage: kinetic-bench write <target> [<count>]
43+
kinetic-bench config <target>
4344
kinetic-bench [options]
4445
4546
Options:
@@ -50,9 +51,10 @@ Options:
5051
#[deriving(Decodable, Show)]
5152
struct Args {
5253
cmd_write: Option<WriteArgs>,
53-
cmd_read: Option<ReadArgs>,
54+
cmd_read: Option<TargetArgs>,
55+
cmd_config: Option<TargetArgs>,
5456
flag_help: bool,
55-
flag_version: bool
57+
flag_version: bool,
5658
}
5759

5860
#[deriving(Decodable, Show)]
@@ -62,7 +64,7 @@ struct WriteArgs{
6264
}
6365

6466
#[deriving(Decodable, Show)]
65-
struct ReadArgs{
67+
struct TargetArgs{
6668
arg_target: String
6769
}
6870

@@ -88,6 +90,12 @@ fn main() {
8890
return;
8991
}
9092

93+
if args.cmd_config.is_some() {
94+
let c = kinetic::Client::connect(format!("{}:8123", args.cmd_config.unwrap().arg_target).as_slice()).unwrap();
95+
println!("{}", c.get_config());
96+
return;
97+
}
98+
9199
if args.cmd_write.is_none() {
92100
println!("{}", USAGE);
93101
return;

src/kinetic/client.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ type KineticResponse = (::proto::Message, ::proto::Command, ::std::vec::Vec<u8>)
4141
#[experimental]
4242
struct KineticChannel {
4343
stream: io::TcpStream,
44-
writer_tx: ::std::comm::SyncSender<KineticCommand>
44+
writer_tx: ::std::comm::SyncSender<KineticCommand>,
45+
pub configuration: ::proto::command::log::Configuration,
46+
pub limits: ::proto::command::log::Limits,
4547
}
4648

4749
#[experimental]
@@ -64,6 +66,9 @@ impl KineticChannel {
6466
// Handshake
6567
let (_, cmd, _) = ::network::recv(&mut s).unwrap();
6668
let connection_id = cmd.get_header().get_connectionID();
69+
let mut the_log = cmd.unwrap_body().unwrap_getLog();
70+
let configuration = the_log.take_configuration();
71+
let limits = the_log.take_limits();
6772

6873
// Other state like pending requests...
6974
let pending_mutex = Arc::new(Mutex::new(collections::HashMap::with_capacity(max_pending)));
@@ -128,7 +133,8 @@ impl KineticChannel {
128133
hmac.input(&buffer);
129134
hmac.input(cmd_bytes.as_slice());
130135

131-
auth.set_hmac(hmac.result().code().to_vec()); // TODO: Code is backed by a Vec, we should have a method to get it.
136+
// TODO: Code is backed by a Vec, we should have an unwrap() method to get it.
137+
auth.set_hmac(hmac.result().code().to_vec());
132138
msg.set_hmacAuth(auth);
133139

134140
msg.set_commandBytes(cmd_bytes);
@@ -137,13 +143,15 @@ impl KineticChannel {
137143
let mut pending = pending_mutex.lock();
138144
pending.insert(seq, callback);
139145
}
146+
140147
let value = value.unwrap_or(vec::Vec::new());
141148
::network::send(&mut writer, &msg, value.as_slice()).unwrap();
142149
seq += 1;
143150
}
144151
});
145152

146-
Ok(KineticChannel { stream: s, writer_tx: w_tx})
153+
Ok(KineticChannel { stream: s, writer_tx: w_tx,
154+
configuration: configuration, limits: limits })
147155
}
148156

149157
#[experimental]
@@ -179,6 +187,16 @@ impl Client {
179187
cluster_version: 0 })
180188
}
181189

190+
// FIXME : Solve this without cloning
191+
pub fn get_config(&self) -> ::proto::command::log::Configuration {
192+
self.channel.configuration.clone()
193+
}
194+
195+
// FIXME : Solve this without cloning
196+
pub fn get_limits(&self) -> ::proto::command::log::Limits {
197+
self.channel.limits.clone()
198+
}
199+
182200
#[inline]
183201
fn send_raw<R : Response, C: Command<R>> (&self, cmd: C) -> Receiver<KineticResponse> {
184202
// build specific command

src/kinetic/commands.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ use core::Command;
2828
use std::vec;
2929

3030
/// Requests the value stored with the given key
31-
#[experimental]
31+
#[unstable]
3232
pub struct Get {
3333
pub key: vec::Vec<u8>
3434
}
3535

36-
#[experimental]
36+
#[unstable]
3737
impl Command<::responses::GetResponse> for Get {
3838

3939
fn build_proto(self) -> (::proto::Command, Option<vec::Vec<u8>>) {
@@ -59,13 +59,13 @@ impl Command<::responses::GetResponse> for Get {
5959
}
6060

6161
/// Stores the value asociated with the key
62-
#[experimental]
62+
#[unstable]
6363
pub struct Put {
6464
pub key: vec::Vec<u8>,
6565
pub value: vec::Vec<u8>
6666
}
6767

68-
#[experimental]
68+
#[unstable]
6969
impl Command<()> for Put {
7070

7171
fn build_proto(self) -> (::proto::Command, Option<vec::Vec<u8>>) {
@@ -93,3 +93,35 @@ impl Command<()> for Put {
9393
}
9494

9595
}
96+
97+
/// Stores the value asociated with the key
98+
#[unstable]
99+
pub struct GetLog {
100+
// FIXME: The operation actually accepts a **set** of types
101+
pub log_type: ::proto::command::LogType
102+
}
103+
104+
#[unstable]
105+
impl Command<::proto::command::GetLog> for GetLog {
106+
107+
fn build_proto(self) -> (::proto::Command, Option<vec::Vec<u8>>) {
108+
let mut cmd = ::proto::Command::new();
109+
let mut header = ::proto::command::Header::new();
110+
111+
// Set command type
112+
header.set_messageType(::proto::command::MessageType::GETLOG);
113+
cmd.set_header(header);
114+
115+
// Build the actual command
116+
let mut get_log = ::proto::command::GetLog::new();
117+
get_log.set_types(vec![self.log_type]);
118+
119+
// Fill the body
120+
let mut body = ::proto::command::Body::new();
121+
body.set_getLog(get_log);
122+
cmd.set_body(body);
123+
124+
(cmd, None) // return command
125+
}
126+
127+
}

src/kinetic/error.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,51 @@
2222

2323
#![stable]
2424

25+
use std::error::Error;
2526
use std::error::FromError;
26-
use protobuf::error::ProtobufError;
27-
use proto::StatusCode;
2827
use std::io::IoError;
28+
use protobuf::error::ProtobufError;
29+
use proto::command::Status;
2930

30-
31-
#[deriving(Show,Eq,PartialEq)]
3231
#[stable]
32+
#[deriving(Show)]
3333
pub enum KineticError {
3434
IoError(IoError),
3535
ProtobufError(ProtobufError),
3636
InvalidMagicNumber,
37-
RemoteError(StatusCode, String)
37+
RemoteError(Status)
38+
}
39+
40+
impl Error for KineticError {
41+
fn description(&self) -> &str {
42+
match *self {
43+
KineticError::IoError(_) => "An I/O error occurred",
44+
KineticError::ProtobufError(_) => "There was an error with the protobuf library",
45+
KineticError::InvalidMagicNumber => "Invalid magic number received",
46+
KineticError::RemoteError(ref status) =>
47+
format!("{}: {}", status.get_code(), status.get_statusMessage()).as_slice().clone(),
48+
}
49+
}
50+
51+
fn detail(&self) -> Option<String> {
52+
match *self {
53+
KineticError::IoError(ref err) => Some(err.description().to_string()),
54+
KineticError::ProtobufError(ref err) => Some(err.description().to_string()),
55+
KineticError::RemoteError(ref status) =>
56+
if status.has_detailedMessage() {
57+
String::from_utf8(status.get_detailedMessage().to_vec()).ok() }
58+
else { None },
59+
_ => None,
60+
}
61+
}
62+
63+
fn cause(&self) -> Option<&Error> {
64+
match *self {
65+
KineticError::IoError(ref err) => Some(err as &Error),
66+
KineticError::ProtobufError(ref err) => Some(err as &Error),
67+
_ => None,
68+
}
69+
}
3870
}
3971

4072
#[stable]

src/kinetic/proto/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,18 @@ pub use proto::raw::Message;
3030

3131
mod raw;
3232

33+
#[stable]
3334
pub mod message {
3435

35-
#![stable]
36-
3736
pub use proto::raw::Message_AuthType as AuthType;
3837
pub use proto::raw::Message_HMACauth as HmacAuth;
3938
pub use proto::raw::Message_PINauth as PinAuth;
4039

4140
}
4241

42+
#[stable]
4343
pub mod command {
4444

45-
#![stable]
46-
4745
pub use proto::raw::Command_Header as Header;
4846
pub use proto::raw::Command_MessageType as MessageType;
4947
pub use proto::raw::Command_Body as Body;
@@ -53,6 +51,16 @@ pub mod command {
5351
pub use proto::raw::Command_Algorithm as Algorithm;
5452
pub use proto::raw::Command_Synchronization as Synchronization;
5553

54+
pub use proto::raw::Command_GetLog as GetLog;
55+
pub use proto::raw::Command_GetLog_Type as LogType;
56+
57+
#[unstable]
58+
pub mod log {
59+
60+
pub use proto::raw::Command_GetLog_Configuration as Configuration;
61+
pub use proto::raw::Command_GetLog_Limits as Limits;
62+
63+
}
5664
}
5765

5866
/// Returns the version of the Kinetic Protocol

0 commit comments

Comments
 (0)