Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Update rpclib to version 2.3.0.#799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
facchinm merged 1 commit intomainfromrpclib_update
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletionslibraries/rpclib/src/rpc/client.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -88,7 +88,7 @@ class client {
//!
//! \param func_name The name of the notification to call.
//! \param args The arguments to pass to the function.
//! \tparam ArgsTHe types of the arguments.
//! \tparam ArgsThe types of the arguments.
//!
//! \note This function returns immediately (possibly before the
//! notification is written to the socket).
Expand DownExpand Up@@ -144,4 +144,4 @@ class client {
};
}

//#include "rpc/client.inl"
#include "rpc/client.inl"
2 changes: 1 addition & 1 deletionlibraries/rpclib/src/rpc/client.inl
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -47,7 +47,7 @@ client::async_call(std::string const &func_name, Args... args) {
//! \param args The arguments to pass to the function.
//! \note This function returns when the notification is written to the
//! socket.
//! \tparam ArgsTHe types of the arguments.
//! \tparam ArgsThe types of the arguments.
template <typename... Args>
void client::send(std::string const &func_name, Args... args) {
RPCLIB_CREATE_LOG_CHANNEL(client)
Expand Down
50 changes: 29 additions & 21 deletionslibraries/rpclib/src/rpc/detail/async_writer.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -23,6 +23,27 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
RPCLIB_ASIO::ip::tcp::socket socket)
: socket_(std::move(socket)), write_strand_(*io), exit_(false) {}

void close() {
exit_ = true;

auto self = shared_from_this();
write_strand_.post([this, self]() {
LOG_INFO("Closing socket");
std::error_code e;
socket_.shutdown(
RPCLIB_ASIO::ip::tcp::socket::shutdown_both, e);
if (e) {
LOG_WARN("std::system_error during socket shutdown. "
"Code: {}. Message: {}", e.value(), e.message());
}
socket_.close();
});
}

bool is_closed() const {
return exit_.load();
}

void do_write() {
if (exit_) {
return;
Expand All@@ -46,20 +67,6 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
} else {
LOG_ERROR("Error while writing to socket: {}", ec);
}

if (exit_) {
LOG_INFO("Closing socket");
try {
socket_.shutdown(
RPCLIB_ASIO::ip::tcp::socket::shutdown_both);
}
catch (std::system_error &e) {
(void)e;
LOG_WARN("std::system_error during socket shutdown. "
"Code: {}. Message: {}", e.code(), e.what());
}
socket_.close();
}
}));
}

Expand All@@ -72,23 +79,24 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
do_write();
}

friend class rpc::client;
RPCLIB_ASIO::ip::tcp::socket& socket() {
return socket_;
}

protected:
template <typename Derived>
std::shared_ptr<Derived> shared_from_base() {
return std::static_pointer_cast<Derived>(shared_from_this());
}

protected:
RPCLIB_ASIO::strand& write_strand() {
return write_strand_;
}

private:
RPCLIB_ASIO::ip::tcp::socket socket_;
RPCLIB_ASIO::strand write_strand_;
std::atomic_bool exit_{false};
bool exited_ = false;
std::mutex m_exit_;
std::condition_variable cv_exit_;

private:
std::deque<RPCLIB_MSGPACK::sbuffer> write_queue_;
RPCLIB_CREATE_LOG_CHANNEL(async_writer)
};
Expand Down
15 changes: 15 additions & 0 deletionslibraries/rpclib/src/rpc/detail/client_error.cc
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
#include "format.h"

#include "rpc/detail/client_error.h"

namespace rpc {
namespace detail {

client_error::client_error(code c, const std::string &msg)
: what_(RPCLIB_FMT::format("client error C{0:04x}: {1}",
static_cast<uint16_t>(c), msg)) {}

const char *client_error::what() const noexcept { return what_.c_str(); }
}
}

10 changes: 9 additions & 1 deletionlibraries/rpclib/src/rpc/detail/log.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -83,11 +83,19 @@ class logger {
std::stringstream ss;
timespec now_t = {};
clock_gettime(CLOCK_REALTIME, &now_t);
#if __GNUC__ >= 5
ss << std::put_time(
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)),
"%F %T")
<< RPCLIB_FMT::format(
#else
char mltime[128];
strftime(mltime, sizeof(mltime), "%c %Z",
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)));
ss << mltime
#endif
<< RPCLIB_FMT::format(
".{:03}", round(static_cast<double>(now_t.tv_nsec) / 1.0e6));

return ss.str();
}
#endif
Expand Down
65 changes: 65 additions & 0 deletionslibraries/rpclib/src/rpc/detail/response.cc
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
#include "rpc/detail/response.h"
#include "rpc/detail/log.h"
#include "rpc/detail/util.h"

#include <assert.h>

namespace rpc {
namespace detail {

response::response() : id_(0), error_(), result_(), empty_(false) {}

response::response(RPCLIB_MSGPACK::object_handle o) : response() {
response_type r;
o.get().convert(r);
// TODO: check protocol [t.szelei 2015-12-30]
id_ = std::get<1>(r);
auto &&error_obj = std::get<2>(r);
if (!error_obj.is_nil()) {
error_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
*error_ = RPCLIB_MSGPACK::clone(error_obj);
}
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>(
std::get<3>(r), std::move(o.zone()));
}

RPCLIB_MSGPACK::sbuffer response::get_data() const {
RPCLIB_MSGPACK::sbuffer data;
response_type r(1, id_, error_ ? error_->get() : RPCLIB_MSGPACK::object(),
result_ ? result_->get() : RPCLIB_MSGPACK::object());
RPCLIB_MSGPACK::pack(data, r);
return data;
}

uint32_t response::get_id() const { return id_; }

std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_error() const { return error_; }

std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_result() const {
return result_;
}

response response::empty() {
response r;
r.empty_ = true;
return r;
}

bool response::is_empty() const { return empty_; }

void response::capture_result(RPCLIB_MSGPACK::object_handle &r) {
if (!result_) {
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
}
result_->set(std::move(r).get());
}

void response::capture_error(RPCLIB_MSGPACK::object_handle &e) {
if (!error_) {
error_ = std::shared_ptr<RPCLIB_MSGPACK::object_handle>();
}
error_->set(std::move(e).get());
}

} /* detail */
} /* rpc */
7 changes: 6 additions & 1 deletionlibraries/rpclib/src/rpc/detail/server_session.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,6 +3,7 @@
#ifndef SESSION_H_5KG6ZMAB
#define SESSION_H_5KG6ZMAB

#include "asio.hpp"
#include <memory>
#include <vector>

Expand All@@ -21,7 +22,9 @@ namespace detail {

class server_session : public async_writer {
public:
server_session(server *srv, std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
server_session(server *srv, RPCLIB_ASIO::io_service *io,
RPCLIB_ASIO::ip::tcp::socket socket,
std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
void start();

void close();
Expand All@@ -31,6 +34,8 @@ class server_session : public async_writer {

private:
server* parent_;
RPCLIB_ASIO::io_service *io_;
RPCLIB_ASIO::strand read_strand_;
std::shared_ptr<dispatcher> disp_;
RPCLIB_MSGPACK::unpacker pac_;
RPCLIB_MSGPACK::sbuffer output_buf_;
Expand Down
143 changes: 143 additions & 0 deletionslibraries/rpclib/src/rpc/dispatcher.cc
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
#include "rpc/dispatcher.h"
#include "format.h"
#include "rpc/detail/client_error.h"
#include "rpc/this_handler.h"

namespace rpc {
namespace detail {

using detail::response;

void dispatcher::dispatch(RPCLIB_MSGPACK::sbuffer const &msg) {
auto unpacked = RPCLIB_MSGPACK::unpack(msg.data(), msg.size());
dispatch(unpacked.get());
}

response dispatcher::dispatch(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
switch (msg.via.array.size) {
case 3:
return dispatch_notification(msg, suppress_exceptions);
case 4:
return dispatch_call(msg, suppress_exceptions);
default:
return response::empty();
}
}

response dispatcher::dispatch_call(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
call_t the_call;
msg.convert(the_call);

// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == 0);

auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call);

auto it_func = funcs_.find(name);

if (it_func != end(funcs_)) {
LOG_DEBUG("Dispatching call to '{}'", name);
try {
auto result = (it_func->second)(args);
return response::make_result(id, std::move(result));
} catch (rpc::detail::client_error &e) {
return response::make_error(
id, RPCLIB_FMT::format("rpclib: {}", e.what()));
} catch (std::exception &e) {
if (!suppress_exceptions) {
throw;
}
return response::make_error(
id,
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
"arg(s)) "
"threw an exception. The exception "
"contained this information: {2}.",
name, args.via.array.size, e.what()));
} catch (rpc::detail::handler_error &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (rpc::detail::handler_spec_response &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (...) {
if (!suppress_exceptions) {
throw;
}
return response::make_error(
id,
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
"arg(s)) threw an exception. The exception "
"is not derived from std::exception. No "
"further information available.",
name, args.via.array.size));
}
}
return response::make_error(
id, RPCLIB_FMT::format("rpclib: server could not find "
"function '{0}' with argument count {1}.",
name, args.via.array.size));
}

response dispatcher::dispatch_notification(RPCLIB_MSGPACK::object const &msg,
bool suppress_exceptions) {
notification_t the_call;
msg.convert(the_call);

// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == static_cast<uint8_t>(request_type::notification));

auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call);

auto it_func = funcs_.find(name);

if (it_func != end(funcs_)) {
LOG_DEBUG("Dispatching call to '{}'", name);
try {
auto result = (it_func->second)(args);
} catch (rpc::detail::handler_error &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (rpc::detail::handler_spec_response &) {
// doing nothing, the exception was only thrown to
// return immediately
} catch (...) {
if (!suppress_exceptions) {
throw;
}
}
}
return response::empty();
}

void dispatcher::enforce_arg_count(std::string const &func, std::size_t found,
std::size_t expected) {
using detail::client_error;
if (found != expected) {
throw client_error(
client_error::code::wrong_arity,
RPCLIB_FMT::format(
"Function '{0}' was called with an invalid number of "
"arguments. Expected: {1}, got: {2}",
func, expected, found));
}
}

void dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func);
if (pos != end(funcs_)) {
throw std::logic_error(
RPCLIB_FMT::format("Function name already bound: '{}'. "
"Please use unique function names", func));
}
}

}
} /* rpc */
13 changes: 13 additions & 0 deletionslibraries/rpclib/src/rpc/dispatcher.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -60,6 +60,19 @@ class dispatcher {
detail::tags::nonvoid_result const &,
detail::tags::nonzero_arg const &);

//! \brief Unbind a functor with a given name from callable functors.
void unbind(std::string const &name) {
funcs_.erase(name);
}

//! \brief returns a list of all names which functors are binded to
std::vector<std::string> names() const {
std::vector<std::string> names;
for(auto it = funcs_.begin(); it != funcs_.end(); ++it)
names.push_back(it->first);
return names;
}

//! @}

//! \brief Processes a message that contains a call according to
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp