Skip to content

Commit ef0514a

Browse files
benescha-rodin
authored andcommitted
Add an ErrBuf helper to manage APIs with error strings
1 parent 2b4407c commit ef0514a

File tree

3 files changed

+39
-14
lines changed

3 files changed

+39
-14
lines changed

src/client.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
1818
use crate::groups::GroupList;
1919
use crate::metadata::Metadata;
2020
use crate::statistics::Statistics;
21-
use crate::util::{bytes_cstr_to_owned, timeout_to_ms};
21+
use crate::util::{ErrBuf, timeout_to_ms};
2222

2323
/// Client-level context
2424
///
@@ -116,21 +116,20 @@ impl<C: ClientContext> Client<C> {
116116
pub fn new(config: &ClientConfig, native_config: NativeClientConfig, rd_kafka_type: RDKafkaType,
117117
context: C)
118118
-> KafkaResult<Client<C>> {
119-
let errstr = [0i8; 1024];
119+
let mut err_buf = ErrBuf::new();
120120
let mut boxed_context = Box::new(context);
121121
unsafe { rdsys::rd_kafka_conf_set_opaque(native_config.ptr(), (&mut *boxed_context) as *mut C as *mut c_void) };
122122
unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
123123
unsafe { rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>)) };
124124
unsafe { rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>)) };
125125

126126
let client_ptr = unsafe {
127-
rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut c_char, errstr.len())
127+
rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), err_buf.as_mut_ptr(), err_buf.len())
128128
};
129129
trace!("Create new librdkafka client {:p}", client_ptr);
130130

131131
if client_ptr.is_null() {
132-
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
133-
return Err(KafkaError::ClientCreation(descr));
132+
return Err(KafkaError::ClientCreation(err_buf.to_string()));
134133
}
135134

136135
unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };

src/config.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@ use crate::rdsys;
2424

2525
use crate::client::ClientContext;
2626
use crate::error::{KafkaError, KafkaResult, IsError};
27-
use crate::util::bytes_cstr_to_owned;
27+
use crate::util::ErrBuf;
2828

2929
use std::collections::HashMap;
3030
use std::ffi::CString;
3131
use std::mem;
32-
use std::os::raw::c_char;
33-
34-
const ERR_LEN: usize = 256;
3532

3633

3734
/// The log levels supported by librdkafka.
@@ -146,17 +143,16 @@ impl ClientConfig {
146143
/// Returns the native rdkafka-sys configuration.
147144
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
148145
let conf = unsafe { rdsys::rd_kafka_conf_new() };
149-
let errstr = [0; ERR_LEN];
146+
let mut err_buf = ErrBuf::new();
150147
for (key, value) in &self.conf_map {
151148
let key_c = CString::new(key.to_string())?;
152149
let value_c = CString::new(value.to_string())?;
153150
let ret = unsafe {
154151
rdsys::rd_kafka_conf_set(conf, key_c.as_ptr(), value_c.as_ptr(),
155-
errstr.as_ptr() as *mut c_char, errstr.len())
152+
err_buf.as_mut_ptr(), err_buf.len())
156153
};
157-
if ret.is_error() {
158-
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
159-
return Err(KafkaError::ClientConfig(ret, descr, key.to_string(), value.to_string()));
154+
if ret.is_error() {;
155+
return Err(KafkaError::ClientConfig(ret, err_buf.to_string(), key.to_string(), value.to_string()));
160156
}
161157
}
162158
Ok(unsafe {NativeClientConfig::from_ptr(conf)})

src/util.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,36 @@ pub unsafe fn cstr_to_owned(cstr: *const i8) -> String {
111111
CStr::from_ptr(cstr as *const c_char).to_string_lossy().into_owned()
112112
}
113113

114+
pub(crate) struct ErrBuf {
115+
buf: [c_char; ErrBuf::MAX_ERR_LEN]
116+
}
117+
118+
impl ErrBuf {
119+
const MAX_ERR_LEN: usize = 512;
120+
121+
pub fn new() -> ErrBuf {
122+
ErrBuf { buf: [0; ErrBuf::MAX_ERR_LEN] }
123+
}
124+
125+
pub fn as_mut_ptr(&mut self) -> *mut i8 {
126+
self.buf.as_mut_ptr()
127+
}
128+
129+
pub fn len(&self) -> usize {
130+
self.buf.len()
131+
}
132+
133+
pub fn to_string(&self) -> String {
134+
unsafe { bytes_cstr_to_owned(&self.buf) }
135+
}
136+
}
137+
138+
impl Default for ErrBuf {
139+
fn default() -> ErrBuf {
140+
ErrBuf::new()
141+
}
142+
}
143+
114144
#[cfg(test)]
115145
mod tests {
116146
use super::*;

0 commit comments

Comments
 (0)