[test-c-api] appended api with helpful methods

valgrind doesn't reports any error
thread sanitizer reports one issue on test tear down (need to
investigate)
This commit is contained in:
Artur Mukhamadiev 2026-03-01 23:54:06 +03:00
parent e0ac93c657
commit f94a23b723
7 changed files with 323 additions and 74 deletions

View File

@ -11,7 +11,7 @@ extern "C" {
struct rpc_string { struct rpc_string {
rpc_string(const char* data, uint64_t size) : s(data,size) {} rpc_string(const char* data, uint64_t size) : s(data,size) {}
rpc_string() = default; rpc_string() = default;
std::string s; std::string s;
}; };
} }
@ -25,7 +25,7 @@ class CRPC_EXPORT RpcServer {
void register_method(const std::string &name, Handler handler); void register_method(const std::string &name, Handler handler);
void register_method(const std::string &name, callback_t handler); void register_method(const std::string &name, callback_t handler);
///@param request_str - json rpc 2.0 formatted string ///@param request_str json rpc 2.0 formatted string
[[nodiscard]] std::string process(const std::string &request_str); [[nodiscard]] std::string process(const std::string &request_str);
private: private:

View File

@ -8,6 +8,8 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include "export.h" #include "export.h"
#include <list>
#include <ranges>
namespace cloud_point_rpc { namespace cloud_point_rpc {
class CRPC_EXPORT TcpServer { class CRPC_EXPORT TcpServer {
@ -18,9 +20,15 @@ class CRPC_EXPORT TcpServer {
: ip_(ip), port_(port), processor_(std::move(processor)), : ip_(ip), port_(port), processor_(std::move(processor)),
acceptor_(io_context_), running_(false) {} acceptor_(io_context_), running_(false) {}
~TcpServer() { stop(); } ~TcpServer() {
stop();
for (auto &thread : client_threads | std::views::keys) {
thread.join();
}
}
void start() { void start() {
using namespace std::chrono_literals;
try { try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_); port_);
@ -35,6 +43,14 @@ class CRPC_EXPORT TcpServer {
accept_thread_ = std::jthread([this]() { accept_thread_ = std::jthread([this]() {
LOG(INFO) << "Accept thread started"; LOG(INFO) << "Accept thread started";
while (running_) { while (running_) {
std::ranges::remove_if(client_threads.begin(), client_threads.end(), [](auto& client_info) {
bool result = false;
if (client_info.second.wait_for(0ms) == std::future_status::ready) {
client_info.first.join();
result = true;
}
return result;
});
try { try {
auto socket = std::make_shared<asio::ip::tcp::socket>( auto socket = std::make_shared<asio::ip::tcp::socket>(
io_context_); io_context_);
@ -43,10 +59,11 @@ class CRPC_EXPORT TcpServer {
LOG(INFO) LOG(INFO)
<< "New connection from " << "New connection from "
<< socket->remote_endpoint().address().to_string(); << socket->remote_endpoint().address().to_string();
auto done = std::make_shared<std::promise<bool>>();
std::jthread([this, socket]() { client_threads.push_back(std::make_pair(std::jthread([this, socket, done]() {
handle_client(socket); handle_client(socket);
}).detach(); done->set_value(true);
}),done->get_future()));
} catch (const std::system_error &e) { } catch (const std::system_error &e) {
LOG(INFO) << "Accept exception: " << e.what(); LOG(INFO) << "Accept exception: " << e.what();
if (running_) { if (running_) {
@ -123,6 +140,7 @@ class CRPC_EXPORT TcpServer {
asio::ip::tcp::acceptor acceptor_; asio::ip::tcp::acceptor acceptor_;
std::atomic<bool> running_; std::atomic<bool> running_;
std::list<std::pair<std::jthread, std::future<bool>>> client_threads;
std::jthread accept_thread_; std::jthread accept_thread_;
}; };

View File

@ -13,7 +13,7 @@ extern "C" {
* struct rpc_string { * struct rpc_string {
* std::string s; * std::string s;
* }; * };
* has internal gc and would be automatically deallocated on deinit call * has internal gc and would be automatically deallocated on deinit call,
* but it is better to call destroy manually, to prevent exceeding memory usage * but it is better to call destroy manually, to prevent exceeding memory usage
*/ */
struct CRPC_EXPORT rpc_string; struct CRPC_EXPORT rpc_string;

View File

@ -7,14 +7,42 @@
extern "C" { extern "C" {
#endif //cpp #endif //cpp
struct CRPC_EXPORT rpc_string; struct CRPC_EXPORT rpc_string;
/// @brief callback function intendent to handle json and retrieve string data
typedef rpc_string*(*callback_t)(rpc_string*); typedef rpc_string*(*callback_t)(rpc_string*);
/// @brief call it to initialize library thread;
/// by default would call available methods one by one
CRPC_EXPORT void crpc_test_init(); CRPC_EXPORT void crpc_test_init();
/// @brief please, call it, when you're done working with the lib
/// @note blocking operation
CRPC_EXPORT void crpc_test_deinit(); CRPC_EXPORT void crpc_test_deinit();
/// @brief crpc_test_add_method will add to the available set/map
/// @note blocking operation
CRPC_EXPORT void crpc_test_add_method(callback_t cb, rpc_string* name); CRPC_EXPORT void crpc_test_add_method(callback_t cb, rpc_string* name);
/// @brief crpc_test_remove_method would
/// @param name pointer to name string; memory management not transferred
CRPC_EXPORT int crpc_test_remove_method(rpc_string* name);
/// @brief crpc_schedule_call would add to the execution queue
/// may be called one after current
/// @note blocking operation
CRPC_EXPORT void crpc_test_schedule_call(rpc_string* name);
/// @brief crpc_test_change_duration will change duration between regular calls
/// @note blocking operation
CRPC_EXPORT void crpc_test_change_duration(uint64_t duration_ms);
/// @note blocking operation
CRPC_EXPORT uint64_t crpc_test_duration();
/// @brief crpc_test_auto_call change internal state of auto call
/// @param state 0 - turn off auto call on sleep; 1 - turn on auto call on sleep
CRPC_EXPORT void crpc_test_auto_call(uint32_t state);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif //cpp #endif //cpp

View File

@ -1,15 +1,18 @@
#include "test_api.h" #include "test_api.h"
#include "server_api.h"
#include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/rpc_server.hpp"
#include <glog/logging.h>
#include <mutex>
#include <stop_token>
#include <thread>
#include <list>
#include "server_api.h" #include "server_api.h"
#include <condition_variable>
#include <glog/logging.h>
#include <list>
#include <mutex>
#include <queue>
#include <set>
#include <stop_token>
#include <thread>
class TestThread { class TestThread {
std::string make_jsonrpc(const std::string& method_name) { static std::string make_jsonrpc(const std::string &method_name) {
return std::format(R"( return std::format(R"(
{{ {{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -17,67 +20,125 @@ class TestThread {
"params": {{}}, "params": {{}},
"id": 1 "id": 1
}} }}
)", method_name); )",
} method_name);
public:
TestThread() = default;
~TestThread() {
join();
} }
public:
TestThread() = default;
~TestThread() { join(); }
void routine() { void routine() {
size_t distance{0};
std::unique_lock lock(mtx); std::unique_lock lock(mtx);
std::stop_token stoken = thr.get_stop_token(); const std::stop_token stoken = thr.get_stop_token();
lock.unlock(); lock.unlock();
while(!stoken.stop_requested()) { while (!stoken.stop_requested()) {
lock.lock(); lock.lock();
DLOG(INFO) << methods.size() << " ; Size of methods"; if (!calls_queue.empty()) {
if(!methods.empty()) { auto front = calls_queue.front();
DLOG(INFO) << front << " will try to call";
calls_queue.pop();
LOG(INFO) << server.process(make_jsonrpc(front));
} else if (state.load() && !methods.empty()) {
auto it = methods.begin(); auto it = methods.begin();
std::advance(it, distance % methods.size()); std::advance(it, distance++ % methods.size());
DLOG(INFO) << *it << " : Method at this position"; DLOG(INFO) << *it << " : Method at this position";
LOG(INFO) << server.process(make_jsonrpc(*it)); LOG(INFO) << server.process(make_jsonrpc(*it));
} }
lock.unlock();
using namespace std::chrono_literals; if (state.load() && calls_queue.empty())
std::this_thread::sleep_for(100ms); cv.wait_for(lock, thr_sleep, [&] {
return stoken.stop_requested();
});
lock.unlock();
} }
LOG(INFO) << "Stopped"; LOG(INFO) << "Stopped";
} }
void start() {
thr = std::jthread(&TestThread::routine, this); void start() { thr = std::jthread(&TestThread::routine, this); }
}
void join() { void join() {
DLOG(INFO) << "Requested thread stop"; if (thr.joinable()) {
thr.request_stop(); DLOG(INFO) << "Requested thread stop";
if(thr.joinable()) { thr.request_stop();
cv.notify_one();
thr.join(); thr.join();
} }
} }
void add_method(callback_t cb, rpc_string* name) { void add_method(const callback_t cb, rpc_string *name) {
LOG(INFO) << "Trying to add method: " << name->s; LOG(INFO) << "Trying to add method: " << name->s;
std::lock_guard lock(mtx); std::lock_guard lock(mtx);
methods.emplace_back(name->s); if (methods.contains(name->s)) {
server.register_method(name->s,cb); LOG(INFO) << "Method already exists: " << name->s;
return;
}
methods.emplace(name->s);
server.register_method(name->s, cb);
} }
void call(rpc_string* name) { int remove_method(const rpc_string *name) {
LOG(INFO) << "Trying to remove method: " << name->s;
std::lock_guard lock(mtx);
int result = 0;
auto it = std::find(methods.begin(), methods.end(), name->s);
if (it != methods.end()) {
methods.erase(it);
} else {
LOG(ERROR) << "Method not found: " << name->s;
result = -1;
}
return result;
}
void call(const rpc_string *name) {
std::lock_guard lock(mtx); std::lock_guard lock(mtx);
LOG(INFO) << server.process(name->s); LOG(INFO) << server.process(name->s);
} }
private:
std::list<std::string> methods; void set_duration(uint64_t duration_ms) {
size_t distance; LOG(INFO) << "Trying to install sleep duration: " << duration_ms;
std::lock_guard lock(mtx);
thr_sleep = std::chrono::milliseconds(duration_ms);
}
uint64_t get_duration() {
std::lock_guard lock(mtx);
return thr_sleep.count();
}
void add_queue_call(const std::string &name) {
std::lock_guard lock(mtx);
calls_queue.emplace(name);
}
void auto_call(bool state) {
this->state.store(state, std::memory_order_relaxed);
}
void reset() {
std::lock_guard lock(mtx);
calls_queue = std::queue<std::string>();
methods.clear();
state.store(true, std::memory_order_relaxed);
server = cloud_point_rpc::RpcServer();
}
private:
std::atomic<bool> state{true};
std::condition_variable cv;
std::queue<std::string> calls_queue{};
std::set<std::string> methods{};
std::jthread thr; std::jthread thr;
std::mutex mtx; std::mutex mtx;
cloud_point_rpc::RpcServer server; cloud_point_rpc::RpcServer server;
std::chrono::duration<int64_t, std::milli> thr_sleep{50};
} test; } test;
extern "C" { extern "C" {
void crpc_test_init() { void crpc_test_init() {
if(!google::IsGoogleLoggingInitialized()) if (!google::IsGoogleLoggingInitialized())
google::InitGoogleLogging("TestRPC"); google::InitGoogleLogging("TestRPC");
try { try {
test.start(); test.start();
@ -88,10 +149,26 @@ void crpc_test_init() {
void crpc_test_deinit() { void crpc_test_deinit() {
test.join(); test.join();
crpc_deinit(); crpc_deinit();
test.reset();
} }
void crpc_test_add_method(callback_t cb, rpc_string* name){ void crpc_test_add_method(callback_t cb, rpc_string *name) {
test.add_method(cb, name); test.add_method(cb, name);
} }
void crpc_test_change_duration(uint64_t duration_ms) {
test.set_duration(duration_ms);
}
uint64_t crpc_test_duration() { return test.get_duration(); }
int crpc_test_remove_method(rpc_string *name) {
return test.remove_method(name);
}
void crpc_test_schedule_call(rpc_string *name) { test.add_queue_call(name->s); }
void crpc_test_auto_call(uint32_t state) {
test.auto_call(static_cast<bool>(state));
}
} }

View File

@ -1,45 +1,169 @@
#include "cloud_point_rpc/rpc_server.hpp"
#include "server_api.h"
#include "test_api.h"
#include <future> #include <future>
#include <glog/logging.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <glog/logging.h>
#include "cloud_point_rpc/rpc_server.hpp"
#include "test_api.h"
#include "server_api.h"
class TestCApi : public ::testing::Test { class TestCApi : public ::testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {
FLAGS_logtostderr = true;
if (!google::IsGoogleLoggingInitialized())
google::InitGoogleLogging("TestRPC");
EXPECT_NO_THROW(crpc_test_init()); EXPECT_NO_THROW(crpc_test_init());
} }
void TearDown() override { void TearDown() override { crpc_test_deinit(); }
crpc_test_deinit();
}
}; };
TEST_F(TestCApi, Base) { TEST_F(TestCApi, Base) {
FLAGS_logtostderr = 1;
rpc_string name; rpc_string name;
name.s = "test"; name.s = "test";
static std::promise<bool> task; static std::promise<bool> task;
std::future<bool> called = task.get_future(); std::future<bool> called = task.get_future();
LOG(INFO) << "Test"; DLOG(INFO) << "Test";
crpc_test_add_method(+[](rpc_string*)-> rpc_string* { crpc_test_add_method(
static bool installed = false; +[](rpc_string *) -> rpc_string * {
DLOG(INFO) << "Trying to do something"; static bool installed = false;
if(!installed){ DLOG(INFO) << "Trying to do something";
LOG(INFO) << "Trying to install"; if (!installed) {
installed = true; DLOG(INFO) << "Trying to install";
task.set_value(installed); installed = true;
} task.set_value(installed);
DLOG(INFO) << "Go out"; }
return crpc_str_create("res", sizeof("res")); DLOG(INFO) << "Go out";
}, &name); return crpc_str_create("res", sizeof("res"));
},
&name);
using namespace std::chrono_literals; using namespace std::chrono_literals;
called.wait_for(500ms); const std::future_status res = called.wait_for(500ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called.valid(), true);
EXPECT_EQ(called.get(), true); EXPECT_EQ(called.get(), true);
LOG(INFO) << "DONE"; DLOG(INFO) << "DONE";
}
TEST_F(TestCApi, AddedMultiple) {
constexpr int N = 4;
std::array<std::promise<bool>, N> tasks;
std::array<std::pair<std::future<bool>, rpc_string>, N> called;
// The Bridge: A static pointer local to this test function
static std::array<std::promise<bool>, N>* bridge;
bridge = &tasks;
for (int i = 0; i < N; i++) {
called[i].first = tasks[i].get_future();
std::string n = "test" + std::to_string(i);
called[i].second = rpc_string{n.c_str(), n.size()};
}
auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* {
static bool installed = false;
if (!installed) {
installed = true;
(*bridge)[I].set_value(true);
}
return crpc_str_create("res", sizeof("res"));
}, &called[I].second);
};
register_idx.template operator()<0>();
register_idx.template operator()<1>();
register_idx.template operator()<2>();
register_idx.template operator()<3>();
auto test_idx = [&]<size_t I>() {
using namespace std::chrono_literals;
const std::future_status res = called[I].first.wait_for(1000ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called[I].first.get(), true);
LOG(INFO) << "Done" << "; task=" << I;
};
test_idx.template operator()<0>();
test_idx.template operator()<1>();
test_idx.template operator()<2>();
test_idx.template operator()<3>();
}
TEST_F(TestCApi, RemoveMethod) {
rpc_string name{"test", sizeof("test") - 1};
EXPECT_EQ(crpc_test_remove_method(&name), -1);
}
TEST_F(TestCApi, ChangeDuration) {
constexpr int dur = 1e3;
EXPECT_NO_THROW(crpc_test_change_duration(dur));
EXPECT_EQ(crpc_test_duration(), dur);
LOG(INFO) << "Test";
}
TEST_F(TestCApi, ScheduleCall) {
EXPECT_NO_THROW(crpc_test_auto_call(0));
constexpr int N = 4;
std::array<std::promise<bool>, N> tasks;
std::array<std::pair<std::future<bool>, rpc_string>, N> called;
// The Bridge: A static pointer local to this test function
static std::array<std::promise<bool>, N>* bridge;
bridge = &tasks;
LOG(INFO) << "Started Schedule Call";
for (int i = 0; i < N; i++) {
called[i].first = tasks[i].get_future();
std::string n = "test" + std::to_string(i);
called[i].second = rpc_string{n.c_str(), n.size()};
}
auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* {
static bool installed = false;
if (!installed) {
installed = true;
(*bridge)[I].set_value(true);
}
return crpc_str_create("res", sizeof("res"));
}, &called[I].second);
};
auto test_idx = [&]<size_t I>() {
using namespace std::chrono_literals;
const std::future_status res = called[I].first.wait_for(500ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called[I].first.get(), true);
};
auto schedule_call = []<size_t I>() {
rpc_string test{"test", sizeof("test") - 1};
test.s += std::to_string(I);
LOG(INFO) << "Trying to add :" << test.s;
crpc_test_schedule_call(&test);
};
register_idx.template operator()<0>();
register_idx.template operator()<1>();
register_idx.template operator()<2>();
register_idx.template operator()<3>();
schedule_call.template operator()<0>();
schedule_call.template operator()<1>();
schedule_call.template operator()<2>();
schedule_call.template operator()<3>();
test_idx.template operator()<0>();
test_idx.template operator()<1>();
test_idx.template operator()<2>();
test_idx.template operator()<3>();
}
TEST_F(TestCApi, String) {
rpc_string name;
name.s = "test";
EXPECT_EQ(name.s.c_str(), crpc_str_get_data(&name));
EXPECT_EQ(name.s.size(), crpc_str_get_size(&name));
auto creation = crpc_str_create("test 2222", sizeof("test 2222"));
EXPECT_NO_THROW(crpc_str_destroy(creation));
} }

View File

@ -12,21 +12,23 @@
using namespace cloud_point_rpc; using namespace cloud_point_rpc;
class CliTest : public ::testing::Test { class CliTest : public ::testing::Test {
public:
void start() {
tcp_server->start();
}
protected: protected:
void SetUp() override { void SetUp() override {
server_ip = "127.0.0.1"; server_ip = "127.0.0.1";
server_port = 9096; server_port = 9096;
rpc_server = std::make_unique<RpcServer>(); rpc_server = std::make_unique<RpcServer>();
rpc_server->register_method(
"hello", [](const nlohmann::json &) { return "world"; });
tcp_server = std::make_unique<TcpServer>( tcp_server = std::make_unique<TcpServer>(
server_ip, server_port, [this](const std::string &req) { server_ip, server_port, [this](const std::string &req) {
return rpc_server->process(req); return rpc_server->process(req);
}); });
tcp_server->start();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
@ -49,7 +51,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
"get-intrinsic-params", [](const nlohmann::json &) { "get-intrinsic-params", [](const nlohmann::json &) {
return std::vector<double>{14589.0, 22489.0, 3123124.555}; return std::vector<double>{14589.0, 22489.0, 3123124.555};
}); });
this->start();
input << "1" << std::endl; input << "1" << std::endl;
input << "0" << std::endl; input << "0" << std::endl;