[fix] resolved several issues found by AI
Some checks failed
Verification / Is-Buildable (push) Failing after 2m25s

This commit is contained in:
Artur Mukhamadiev 2026-04-21 15:29:33 +03:00
parent be4dc501ae
commit f4eb57cd36
7 changed files with 186 additions and 80 deletions

View File

@ -1,11 +1,12 @@
#pragma once #pragma once
#include "export.h"
#include <functional> #include <functional>
#include <jsonrpccxx/server.hpp> #include <jsonrpccxx/server.hpp>
#include <map> #include <map>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <string> #include <string>
#include "export.h" #include <variant>
extern "C" { extern "C" {
struct rpc_string { struct rpc_string {
@ -20,7 +21,8 @@ namespace score {
class CRPC_EXPORT RpcServer { class CRPC_EXPORT RpcServer {
public: public:
using Handler = std::function<nlohmann::json(const nlohmann::json &)>; using Handler = std::function<std::variant<nlohmann::json, std::string>(
const nlohmann::json &)>;
using callback_t = rpc_string *(*)(rpc_string *); using callback_t = rpc_string *(*)(rpc_string *);
void register_method(const std::string &name, Handler handler); void register_method(const std::string &name, Handler handler);
@ -32,4 +34,4 @@ class CRPC_EXPORT RpcServer {
std::map<std::string, Handler> handlers_; std::map<std::string, Handler> handlers_;
}; };
} // namespace cloud_point_rpc } // namespace score

View File

@ -1,15 +1,15 @@
#pragma once #pragma once
#include "export.h"
#include <asio.hpp> #include <asio.hpp>
#include <atomic> #include <atomic>
#include <cloud_point_rpc/tcp_read.hpp> #include <cloud_point_rpc/tcp_read.hpp>
#include <functional> #include <functional>
#include <glog/logging.h> #include <glog/logging.h>
#include <string>
#include <thread>
#include "export.h"
#include <list> #include <list>
#include <ranges> #include <ranges>
#include <string>
#include <thread>
namespace score { namespace score {
class CRPC_EXPORT TcpServer { class CRPC_EXPORT TcpServer {
@ -22,6 +22,7 @@ class CRPC_EXPORT TcpServer {
~TcpServer() { ~TcpServer() {
stop(); stop();
std::lock_guard lock(cliThrMtx);
for (auto &thread : client_threads | std::views::keys) { for (auto &thread : client_threads | std::views::keys) {
thread.join(); thread.join();
} }
@ -43,14 +44,18 @@ class CRPC_EXPORT TcpServer {
accept_thread_ = std::jthread([this]() { accept_thread_ = std::jthread([this]() {
LOG(INFO) << "Accept thread started"; LOG(INFO) << "Accept thread started";
while (running_) { while (running_) {
std::ranges::remove_if(client_threads.begin(), client_threads.end(), [](auto& client_info) { {
std::lock_guard lock(cliThrMtx);
client_threads.remove_if([](auto &client_info) {
bool result = false; bool result = false;
if (client_info.second.wait_for(0ms) == std::future_status::ready) { if (client_info.second.wait_for(0ms) ==
std::future_status::ready) {
client_info.first.join(); client_info.first.join();
result = true; result = true;
} }
return result; return result;
}); });
}
try { try {
auto socket = std::make_shared<asio::ip::tcp::socket>( auto socket = std::make_shared<asio::ip::tcp::socket>(
io_context_); io_context_);
@ -60,10 +65,15 @@ class CRPC_EXPORT TcpServer {
<< "New connection from " << "New connection from "
<< socket->remote_endpoint().address().to_string(); << socket->remote_endpoint().address().to_string();
auto done = std::make_shared<std::promise<bool>>(); auto done = std::make_shared<std::promise<bool>>();
client_threads.push_back(std::make_pair(std::jthread([this, socket, done]() { {
std::lock_guard lock(cliThrMtx);
client_threads.push_back(std::make_pair(
std::jthread([this, socket, done]() {
handle_client(socket); handle_client(socket);
done->set_value(true); done->set_value(true);
}),done->get_future())); }),
done->get_future()));
}
} catch (const std::system_error &e) { } catch (const std::system_error &e) {
LOG(INFO) << "Accept exception: " << e.what(); LOG(INFO) << "Accept exception: " << e.what();
if (running_) { if (running_) {
@ -141,7 +151,8 @@ class CRPC_EXPORT TcpServer {
std::atomic<bool> running_; std::atomic<bool> running_;
std::list<std::pair<std::jthread, std::future<bool>>> client_threads; std::list<std::pair<std::jthread, std::future<bool>>> client_threads;
std::mutex cliThrMtx;
std::jthread accept_thread_; std::jthread accept_thread_;
}; };
} // namespace cloud_point_rpc } // namespace score

View File

@ -16,6 +16,8 @@ Base64RPCCoder::~Base64RPCCoder() = default;
* @return vector of raw bytes << allocated on encoded.size() / 4 * 3 + 1 size * @return vector of raw bytes << allocated on encoded.size() / 4 * 3 + 1 size
*/ */
std::vector<char> Base64RPCCoder::decode(const std::string& encoded) { std::vector<char> Base64RPCCoder::decode(const std::string& encoded) {
if (encoded.length() > (std::numeric_limits<size_t>::max() / 3) * 4)
throw std::length_error("Base64 input too large");
DLOG(INFO) << "Base64RPCCoder::decode"; DLOG(INFO) << "Base64RPCCoder::decode";
std::vector<char> result((encoded.length() >> 2) * 3 + 1); std::vector<char> result((encoded.length() >> 2) * 3 + 1);
size_t result_len = 0; size_t result_len = 0;
@ -30,6 +32,8 @@ std::vector<char> Base64RPCCoder::decode(const std::string& encoded) {
* @return encoded base64 string * @return encoded base64 string
*/ */
std::string Base64RPCCoder::encode(const std::vector<char>& data) { std::string Base64RPCCoder::encode(const std::vector<char>& data) {
if (data.size() > (std::numeric_limits<size_t>::max() / 4) * 3)
throw std::length_error("raw input is too large");
DLOG(INFO) << "Base64RPCCoder::encode"; DLOG(INFO) << "Base64RPCCoder::encode";
size_t result_len = 0; size_t result_len = 0;
std::string result(data.size() / 3 * 4 + 1, 0); std::string result(data.size() / 3 * 4 + 1, 0);

View File

@ -1,5 +1,7 @@
#include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/rpc_server.hpp"
#include "server_api.h"
#include <glog/logging.h> #include <glog/logging.h>
#include <variant>
using json = nlohmann::json; using json = nlohmann::json;
namespace score { namespace score {
@ -12,21 +14,58 @@ json create_error(int code, const std::string &message,
{"id", id}}; {"id", id}};
} }
json create_success(const json &result, const json &id) { struct CreateSuccess {
return {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}}; json obj;
json id;
void operator()(const json &result) {
obj = {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}};
} }
void operator()(const std::string &result) {
obj = {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}};
}
};
} // namespace } // namespace
template <typename T> struct Deleter {
void operator()(T *element) {
(void)element;
LOG(ERROR) << "Called default deleter";
}
};
template <> struct Deleter<rpc_string> {
void operator()(rpc_string *element) {
if (element) {
crpc_str_destroy(element);
}
}
};
using rpcStringPtr = std::unique_ptr<rpc_string, Deleter<rpc_string>>;
void RpcServer::register_method(const std::string &name, Handler handler) { void RpcServer::register_method(const std::string &name, Handler handler) {
handlers_[name] = std::move(handler); handlers_[name] = std::move(handler);
} }
void RpcServer::register_method(const std::string &name, callback_t handler) { void RpcServer::register_method(const std::string &name, callback_t handler) {
handlers_[name] = [handler](const nlohmann::json& j) -> nlohmann::json { handlers_[name] = [handler](const nlohmann::json &j)
-> std::variant<nlohmann::json, std::string> {
rpc_string tmp; rpc_string tmp;
tmp.s = j.dump(); tmp.s = j.dump();
rpc_string* res = handler(&tmp); auto res = rpcStringPtr(handler(&tmp));
return {res->s}; if (!res) {
LOG(ERROR) << "Method is invalid";
return {};
}
std::variant<nlohmann::json, std::string> ret;
try {
ret = json::parse(res->s);
} catch (std::exception &e) {
DLOG(INFO) << "return value is not a json";
ret = res->s;
}
return ret;
}; };
} }
@ -59,11 +98,14 @@ std::string RpcServer::process(const std::string &request_str) {
} }
try { try {
json result = it->second(params); auto result = it->second(params);
return create_success(result, id).dump(); CreateSuccess visitor;
visitor.id = id;
std::visit(visitor, result);
return visitor.obj.dump();
} catch (const std::exception &e) { } catch (const std::exception &e) {
return create_error(-32000, e.what(), id).dump(); // Server error return create_error(-32000, e.what(), id).dump(); // Server error
} }
} }
} // namespace cloud_point_rpc } // namespace score

View File

@ -1,52 +1,73 @@
#include "server_api.h"
#include "cloud_point_rpc/config.hpp" #include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/tcp_server.hpp" #include "cloud_point_rpc/tcp_server.hpp"
#include <glog/logging.h>
#include "server_api.h"
#include <algorithm> #include <algorithm>
#include <glog/logging.h>
#include <list>
#include <memory> #include <memory>
#include <string> #include <string>
#include <list>
static std::list<std::unique_ptr<rpc_string>> gc; static std::list<std::unique_ptr<rpc_string>> gc;
std::mutex gc_mtx;
std::mutex server_mtx;
score::RpcServer rpc_server; score::RpcServer rpc_server;
std::unique_ptr<score::TcpServer> server = nullptr; std::unique_ptr<score::TcpServer> server = nullptr;
extern "C" { extern "C" {
const char *crpc_str_get_data(const rpc_string *that) { const char *crpc_str_get_data(const rpc_string *that) {
if (!that) {
LOG(ERROR) << "Tried to get data on nullptr";
return nullptr;
}
return that->s.c_str(); return that->s.c_str();
} }
uint64_t crpc_str_get_size(const rpc_string *that) { uint64_t crpc_str_get_size(const rpc_string *that) {
if (!that) {
LOG(ERROR) << "Tried to get size on nullptr";
return 0;
}
return that->s.size(); return that->s.size();
} }
rpc_string *crpc_str_create(const char *data, uint64_t size) { rpc_string *crpc_str_create(const char *data, uint64_t size) {
if (!data) {
LOG(ERROR) << "Tried to create with nullptr data";
return nullptr;
}
std::lock_guard lock(gc_mtx);
gc.push_back(std::make_unique<rpc_string>(data, size)); gc.push_back(std::make_unique<rpc_string>(data, size));
return gc.back().get(); return gc.back().get();
} }
void crpc_str_destroy(rpc_string *that) { void crpc_str_destroy(rpc_string *that) {
if (!that) {
LOG(ERROR) << "Tried to destroy on nullptr";
return;
}
std::lock_guard lock(gc_mtx);
auto it = std::ranges::find(gc, that, &std::unique_ptr<rpc_string>::get); auto it = std::ranges::find(gc, that, &std::unique_ptr<rpc_string>::get);
if (it != gc.end()) if (it != gc.end())
gc.erase(it); gc.erase(it);
} }
void crpc_init(const char *config_path) { void crpc_init(const char *config_path) {
google::InitGoogleLogging("CloudPointRPC"); google::InitGoogleLogging("CloudPointRPC");
if (config_path == nullptr) { if (config_path == nullptr) {
LOG(INFO) << "config_path was not provided"; LOG(ERROR) << "config_path was not provided";
return;
} }
try { try {
auto config = score::ConfigLoader::load(config_path); auto config = score::ConfigLoader::load(config_path);
LOG(INFO) << "Loaded config from " << config_path; LOG(INFO) << "Loaded config from " << config_path;
server = std::make_unique<score::TcpServer>(config.server.ip, config.server.port, server = std::make_unique<score::TcpServer>(
config.server.ip, config.server.port,
[&](const std::string &request) { [&](const std::string &request) {
return rpc_server.process( std::lock_guard lock(server_mtx);
request); return rpc_server.process(request);
}); });
server->start(); server->start();
} catch (const std::exception &e) { } catch (const std::exception &e) {
@ -58,11 +79,12 @@ void crpc_deinit() {
if (server) if (server)
server->join(); server->join();
server.reset(); server.reset();
std::lock_guard lock(gc_mtx);
gc.clear(); gc.clear();
} }
void crpc_add_method(callback_t cb, rpc_string *name) { void crpc_add_method(callback_t cb, rpc_string *name) {
std::lock_guard lock(server_mtx);
rpc_server.register_method(name->s, cb); rpc_server.register_method(name->s, cb);
} }
} }

View File

@ -65,6 +65,10 @@ class TestThread {
} }
} }
void add_method(const callback_t cb, rpc_string *name) { void add_method(const callback_t cb, rpc_string *name) {
if(!name || !name->s.size()) {
LOG(ERROR) << "Tried to add method with invalid name";
return;
}
LOG(INFO) << "Trying to add method: " << name->s; LOG(INFO) << "Trying to add method: " << name->s;
std::lock_guard lock(mtx); std::lock_guard lock(mtx);
if (methods.contains(name->s)) { if (methods.contains(name->s)) {
@ -76,6 +80,10 @@ class TestThread {
} }
int remove_method(const rpc_string *name) { int remove_method(const rpc_string *name) {
if(!name || !name->s.size()) {
LOG(ERROR) << "Tried to remove method with invalid name";
return -1;
}
LOG(INFO) << "Trying to remove method: " << name->s; LOG(INFO) << "Trying to remove method: " << name->s;
std::lock_guard lock(mtx); std::lock_guard lock(mtx);
int result = 0; int result = 0;
@ -90,6 +98,10 @@ class TestThread {
} }
void call(const rpc_string *name) { void call(const rpc_string *name) {
if (!name) {
LOG(ERROR) << "Called with nullptr name";
return;
}
std::lock_guard lock(mtx); std::lock_guard lock(mtx);
LOG(INFO) << server.process(name->s); LOG(INFO) << server.process(name->s);
} }
@ -166,7 +178,13 @@ int crpc_test_remove_method(rpc_string *name) {
return test.remove_method(name); return test.remove_method(name);
} }
void crpc_test_schedule_call(rpc_string *name) { test.add_queue_call(name->s); } void crpc_test_schedule_call(rpc_string *name) {
if (!name) {
LOG(ERROR) << "Called with name nullptr";
return;
}
test.add_queue_call(name->s);
}
void crpc_test_auto_call(uint32_t state) { void crpc_test_auto_call(uint32_t state) {
test.auto_call(static_cast<bool>(state)); test.auto_call(static_cast<bool>(state));

View File

@ -35,7 +35,7 @@ TEST_F(TestCApi, Base) {
task.set_value(installed); task.set_value(installed);
} }
DLOG(INFO) << "Go out"; DLOG(INFO) << "Go out";
return crpc_str_create("res", sizeof("res")); return crpc_str_create("res", sizeof("res") - 1);
}, },
&name); &name);
@ -63,14 +63,16 @@ TEST_F(TestCApi, AddedMultiple) {
} }
auto register_idx = [&]<size_t I>() { auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* { crpc_test_add_method(
+[](rpc_string *) -> rpc_string * {
static bool installed = false; static bool installed = false;
if (!installed) { if (!installed) {
installed = true; installed = true;
(*bridge)[I].set_value(true); (*bridge)[I].set_value(true);
} }
return crpc_str_create("res", sizeof("res")); return crpc_str_create("res", sizeof("res"));
}, &called[I].second); },
&called[I].second);
}; };
register_idx.template operator()<0>(); register_idx.template operator()<0>();
@ -121,14 +123,17 @@ TEST_F(TestCApi, ScheduleCall) {
} }
auto register_idx = [&]<size_t I>() { auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* { crpc_test_add_method(
+[](rpc_string *) -> rpc_string * {
static bool installed = false; static bool installed = false;
if (!installed) { if (!installed) {
installed = true; installed = true;
(*bridge)[I].set_value(true); (*bridge)[I].set_value(true);
} }
return crpc_str_create("res", sizeof("res")); std::string_view res = "res";
}, &called[I].second); return crpc_str_create(res.data(), res.size());
},
&called[I].second);
}; };
auto test_idx = [&]<size_t I>() { auto test_idx = [&]<size_t I>() {
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -163,7 +168,9 @@ TEST_F(TestCApi, String) {
name.s = "test"; name.s = "test";
EXPECT_EQ(name.s.c_str(), crpc_str_get_data(&name)); EXPECT_EQ(name.s.c_str(), crpc_str_get_data(&name));
EXPECT_EQ(name.s.size(), crpc_str_get_size(&name)); EXPECT_EQ(name.s.size(), crpc_str_get_size(&name));
std::string_view testString = "test 2222";
auto creation = crpc_str_create("test 2222", sizeof("test 2222")); auto creation =
crpc_str_create(testString.data(), testString.size());
EXPECT_EQ(std::string_view(crpc_str_get_data(creation)), testString);
EXPECT_NO_THROW(crpc_str_destroy(creation)); EXPECT_NO_THROW(crpc_str_destroy(creation));
} }