Skip to content

Update rpclib to version 2.3.0. #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libraries/rpclib/src/rpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class client {
//!
//! \param func_name The name of the notification to call.
//! \param args The arguments to pass to the function.
//! \tparam Args THe types of the arguments.
//! \tparam Args The types of the arguments.
//!
//! \note This function returns immediately (possibly before the
//! notification is written to the socket).
Expand Down Expand Up @@ -144,4 +144,4 @@ class client {
};
}

//#include "rpc/client.inl"
#include "rpc/client.inl"
2 changes: 1 addition & 1 deletion libraries/rpclib/src/rpc/client.inl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ client::async_call(std::string const &func_name, Args... args) {
//! \param args The arguments to pass to the function.
//! \note This function returns when the notification is written to the
//! socket.
//! \tparam Args THe types of the arguments.
//! \tparam Args The types of the arguments.
template <typename... Args>
void client::send(std::string const &func_name, Args... args) {
RPCLIB_CREATE_LOG_CHANNEL(client)
Expand Down
50 changes: 29 additions & 21 deletions libraries/rpclib/src/rpc/detail/async_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
RPCLIB_ASIO::ip::tcp::socket socket)
: socket_(std::move(socket)), write_strand_(*io), exit_(false) {}

void close() {
exit_ = true;

auto self = shared_from_this();
write_strand_.post([this, self]() {
LOG_INFO("Closing socket");
std::error_code e;
socket_.shutdown(
RPCLIB_ASIO::ip::tcp::socket::shutdown_both, e);
if (e) {
LOG_WARN("std::system_error during socket shutdown. "
"Code: {}. Message: {}", e.value(), e.message());
}
socket_.close();
});
}

bool is_closed() const {
return exit_.load();
}

void do_write() {
if (exit_) {
return;
Expand All @@ -46,20 +67,6 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
} else {
LOG_ERROR("Error while writing to socket: {}", ec);
}

if (exit_) {
LOG_INFO("Closing socket");
try {
socket_.shutdown(
RPCLIB_ASIO::ip::tcp::socket::shutdown_both);
}
catch (std::system_error &e) {
(void)e;
LOG_WARN("std::system_error during socket shutdown. "
"Code: {}. Message: {}", e.code(), e.what());
}
socket_.close();
}
}));
}

Expand All @@ -72,23 +79,24 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
do_write();
}

friend class rpc::client;
RPCLIB_ASIO::ip::tcp::socket& socket() {
return socket_;
}

protected:
template <typename Derived>
std::shared_ptr<Derived> shared_from_base() {
return std::static_pointer_cast<Derived>(shared_from_this());
}

protected:
RPCLIB_ASIO::strand& write_strand() {
return write_strand_;
}

private:
RPCLIB_ASIO::ip::tcp::socket socket_;
RPCLIB_ASIO::strand write_strand_;
std::atomic_bool exit_{false};
bool exited_ = false;
std::mutex m_exit_;
std::condition_variable cv_exit_;

private:
std::deque<RPCLIB_MSGPACK::sbuffer> write_queue_;
RPCLIB_CREATE_LOG_CHANNEL(async_writer)
};
Expand Down
15 changes: 15 additions & 0 deletions libraries/rpclib/src/rpc/detail/client_error.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "format.h"

#include "rpc/detail/client_error.h"

namespace rpc {
namespace detail {

client_error::client_error(code c, const std::string &msg)
: what_(RPCLIB_FMT::format("client error C{0:04x}: {1}",
static_cast<uint16_t>(c), msg)) {}

const char *client_error::what() const noexcept { return what_.c_str(); }
}
}

10 changes: 9 additions & 1 deletion libraries/rpclib/src/rpc/detail/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,19 @@ class logger {
std::stringstream ss;
timespec now_t = {};
clock_gettime(CLOCK_REALTIME, &now_t);
#if __GNUC__ >= 5
ss << std::put_time(
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)),
"%F %T")
<< RPCLIB_FMT::format(
#else
char mltime[128];
strftime(mltime, sizeof(mltime), "%c %Z",
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)));
ss << mltime
#endif
<< RPCLIB_FMT::format(
".{:03}", round(static_cast<double>(now_t.tv_nsec) / 1.0e6));

return ss.str();
}
#endif
Expand Down
65 changes: 65 additions & 0 deletions libraries/rpclib/src/rpc/detail/response.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "rpc/detail/response.h"
#include "rpc/detail/log.h"
#include "rpc/detail/util.h"

#include <assert.h>

namespace rpc {
namespace detail {

response::response() : id_(0), error_(), result_(), empty_(false) {}

response::response(RPCLIB_MSGPACK::object_handle o) : response() {
response_type r;
o.get().convert(r);
// TODO: check protocol [t.szelei 2015-12-30]
id_ = std::get<1>(r);
auto &&error_obj = std::get<2>(r);
if (!error_obj.is_nil()) {
error_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
*error_ = RPCLIB_MSGPACK::clone(error_obj);
}
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>(
std::get<3>(r), std::move(o.zone()));
}

RPCLIB_MSGPACK::sbuffer response::get_data() const {
RPCLIB_MSGPACK::sbuffer data;
response_type r(1, id_, error_ ? error_->get() : RPCLIB_MSGPACK::object(),
result_ ? result_->get() : RPCLIB_MSGPACK::object());
RPCLIB_MSGPACK::pack(data, r);
return data;
}

uint32_t response::get_id() const { return id_; }

std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_error() const { return error_; }

std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_result() const {
return result_;
}

response response::empty() {
response r;
r.empty_ = true;
return r;
}

bool response::is_empty() const { return empty_; }

void response::capture_result(RPCLIB_MSGPACK::object_handle &r) {
if (!result_) {
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
}
result_->set(std::move(r).get());
}

void response::capture_error(RPCLIB_MSGPACK::object_handle &e) {
if (!error_) {
error_ = std::shared_ptr<RPCLIB_MSGPACK::object_handle>();
}
error_->set(std::move(e).get());
}

} /* detail */
} /* rpc */
7 changes: 6 additions & 1 deletion libraries/rpclib/src/rpc/detail/server_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#ifndef SESSION_H_5KG6ZMAB
#define SESSION_H_5KG6ZMAB

#include "asio.hpp"
#include <memory>
#include <vector>

Expand All @@ -21,7 +22,9 @@ namespace detail {

class server_session : public async_writer {
public:
server_session(server *srv, std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
server_session(server *srv, RPCLIB_ASIO::io_service *io,
RPCLIB_ASIO::ip::tcp::socket socket,
std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
void start();

void close();
Expand All @@ -31,6 +34,8 @@ class server_session : public async_writer {

private:
server* parent_;
RPCLIB_ASIO::io_service *io_;
RPCLIB_ASIO::strand read_strand_;
std::shared_ptr<dispatcher> disp_;
RPCLIB_MSGPACK::unpacker pac_;
RPCLIB_MSGPACK::sbuffer output_buf_;
Expand Down
143 changes: 143 additions & 0 deletions libraries/rpclib/src/rpc/dispatcher.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include "rpc/dispatcher.h"
#include "format.h"
#include "rpc/detail/client_error.h"
#include "rpc/this_handler.h"

namespace rpc {
namespace detail {

using detail::response;

void dispatcher::dispatch(RPCLIB_MSGPACK::sbuffer const &msg) {
auto unpacked = RPCLIB_MSGPACK::unpack(msg.data(), msg.size());
dispatch(unpacked.get());
}

response dispatcher::dispatch(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
switch (msg.via.array.size) {
case 3:
return dispatch_notification(msg, suppress_exceptions);
case 4:
return dispatch_call(msg, suppress_exceptions);
default:
return response::empty();
}
}

response dispatcher::dispatch_call(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
call_t the_call;
msg.convert(the_call);

// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == 0);

auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call);

auto it_func = funcs_.find(name);

if (it_func != end(funcs_)) {
LOG_DEBUG("Dispatching call to '{}'", name);
try {
auto result = (it_func->second)(args);
return response::make_result(id, std::move(result));
} catch (rpc::detail::client_error &e) {
return response::make_error(
id, RPCLIB_FMT::format("rpclib: {}", e.what()));
} catch (std::exception &e) {
if (!suppress_exceptions) {
throw;
}
return response::make_error(
id,
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
"arg(s)) "
"threw an exception. The exception "
"contained this information: {2}.",
name, args.via.array.size, e.what()));
} catch (rpc::detail::handler_error &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (rpc::detail::handler_spec_response &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (...) {
if (!suppress_exceptions) {
throw;
}
return response::make_error(
id,
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
"arg(s)) threw an exception. The exception "
"is not derived from std::exception. No "
"further information available.",
name, args.via.array.size));
}
}
return response::make_error(
id, RPCLIB_FMT::format("rpclib: server could not find "
"function '{0}' with argument count {1}.",
name, args.via.array.size));
}

response dispatcher::dispatch_notification(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
notification_t the_call;
msg.convert(the_call);

// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == static_cast<uint8_t>(request_type::notification));

auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call);

auto it_func = funcs_.find(name);

if (it_func != end(funcs_)) {
LOG_DEBUG("Dispatching call to '{}'", name);
try {
auto result = (it_func->second)(args);
} catch (rpc::detail::handler_error &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (rpc::detail::handler_spec_response &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (...) {
if (!suppress_exceptions) {
throw;
}
}
}
return response::empty();
}

void dispatcher::enforce_arg_count(std::string const &func, std::size_t found,
std::size_t expected) {
using detail::client_error;
if (found != expected) {
throw client_error(
client_error::code::wrong_arity,
RPCLIB_FMT::format(
"Function '{0}' was called with an invalid number of "
"arguments. Expected: {1}, got: {2}",
func, expected, found));
}
}

void dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func);
if (pos != end(funcs_)) {
throw std::logic_error(
RPCLIB_FMT::format("Function name already bound: '{}'. "
"Please use unique function names", func));
}
}

}
} /* rpc */
13 changes: 13 additions & 0 deletions libraries/rpclib/src/rpc/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ class dispatcher {
detail::tags::nonvoid_result const &,
detail::tags::nonzero_arg const &);

//! \brief Unbind a functor with a given name from callable functors.
void unbind(std::string const &name) {
funcs_.erase(name);
}

//! \brief returns a list of all names which functors are binded to
std::vector<std::string> names() const {
std::vector<std::string> names;
for(auto it = funcs_.begin(); it != funcs_.end(); ++it)
names.push_back(it->first);
return names;
}

//! @}

//! \brief Processes a message that contains a call according to
Expand Down
Loading