diff --git a/include/cloud_point_rpc/rpc_server.hpp b/include/cloud_point_rpc/rpc_server.hpp index 4d03079..c52c34b 100644 --- a/include/cloud_point_rpc/rpc_server.hpp +++ b/include/cloud_point_rpc/rpc_server.hpp @@ -11,7 +11,7 @@ extern "C" { struct rpc_string { rpc_string(const char* data, uint64_t size) : s(data,size) {} rpc_string() = default; - + std::string s; }; } @@ -25,7 +25,7 @@ class CRPC_EXPORT RpcServer { void register_method(const std::string &name, Handler handler); void register_method(const std::string &name, callback_t handler); - ///@param request_str - json rpc 2.0 formatted string + ///@param request_str json rpc 2.0 formatted string [[nodiscard]] std::string process(const std::string &request_str); private: diff --git a/include/cloud_point_rpc/tcp_server.hpp b/include/cloud_point_rpc/tcp_server.hpp index 8b8d3dc..0adb1ff 100644 --- a/include/cloud_point_rpc/tcp_server.hpp +++ b/include/cloud_point_rpc/tcp_server.hpp @@ -8,6 +8,8 @@ #include #include #include "export.h" +#include +#include namespace cloud_point_rpc { class CRPC_EXPORT TcpServer { @@ -18,9 +20,15 @@ class CRPC_EXPORT TcpServer { : ip_(ip), port_(port), processor_(std::move(processor)), acceptor_(io_context_), running_(false) {} - ~TcpServer() { stop(); } + ~TcpServer() { + stop(); + for (auto &thread : client_threads | std::views::keys) { + thread.join(); + } + } void start() { + using namespace std::chrono_literals; try { asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_); @@ -35,6 +43,14 @@ class CRPC_EXPORT TcpServer { accept_thread_ = std::jthread([this]() { LOG(INFO) << "Accept thread started"; while (running_) { + std::ranges::remove_if(client_threads.begin(), client_threads.end(), [](auto& client_info) { + bool result = false; + if (client_info.second.wait_for(0ms) == std::future_status::ready) { + client_info.first.join(); + result = true; + } + return result; + }); try { auto socket = std::make_shared( io_context_); @@ -43,10 +59,11 @@ class CRPC_EXPORT TcpServer { LOG(INFO) << "New connection from " << socket->remote_endpoint().address().to_string(); - - std::jthread([this, socket]() { + auto done = std::make_shared>(); + client_threads.push_back(std::make_pair(std::jthread([this, socket, done]() { handle_client(socket); - }).detach(); + done->set_value(true); + }),done->get_future())); } catch (const std::system_error &e) { LOG(INFO) << "Accept exception: " << e.what(); if (running_) { @@ -123,6 +140,7 @@ class CRPC_EXPORT TcpServer { asio::ip::tcp::acceptor acceptor_; std::atomic running_; + std::list>> client_threads; std::jthread accept_thread_; }; diff --git a/include/server_api.h b/include/server_api.h index 9679c68..2dde8e1 100644 --- a/include/server_api.h +++ b/include/server_api.h @@ -13,7 +13,7 @@ extern "C" { * struct rpc_string { * std::string s; * }; - * has internal gc and would be automatically deallocated on deinit call + * has internal gc and would be automatically deallocated on deinit call, * but it is better to call destroy manually, to prevent exceeding memory usage */ struct CRPC_EXPORT rpc_string; diff --git a/include/test_api.h b/include/test_api.h index b208c2b..4382b7c 100644 --- a/include/test_api.h +++ b/include/test_api.h @@ -7,14 +7,42 @@ extern "C" { #endif //cpp -struct CRPC_EXPORT rpc_string; +struct CRPC_EXPORT rpc_string; + +/// @brief callback function intendent to handle json and retrieve string data typedef rpc_string*(*callback_t)(rpc_string*); +/// @brief call it to initialize library thread; +/// by default would call available methods one by one CRPC_EXPORT void crpc_test_init(); + +/// @brief please, call it, when you're done working with the lib +/// @note blocking operation CRPC_EXPORT void crpc_test_deinit(); +/// @brief crpc_test_add_method will add to the available set/map +/// @note blocking operation CRPC_EXPORT void crpc_test_add_method(callback_t cb, rpc_string* name); +/// @brief crpc_test_remove_method would +/// @param name pointer to name string; memory management not transferred +CRPC_EXPORT int crpc_test_remove_method(rpc_string* name); + +/// @brief crpc_schedule_call would add to the execution queue +/// may be called one after current +/// @note blocking operation +CRPC_EXPORT void crpc_test_schedule_call(rpc_string* name); + +/// @brief crpc_test_change_duration will change duration between regular calls +/// @note blocking operation +CRPC_EXPORT void crpc_test_change_duration(uint64_t duration_ms); + +/// @note blocking operation +CRPC_EXPORT uint64_t crpc_test_duration(); + +/// @brief crpc_test_auto_call change internal state of auto call +/// @param state 0 - turn off auto call on sleep; 1 - turn on auto call on sleep +CRPC_EXPORT void crpc_test_auto_call(uint32_t state); #ifdef __cplusplus } #endif //cpp diff --git a/src/test_api.cpp b/src/test_api.cpp index 65cd662..166e426 100644 --- a/src/test_api.cpp +++ b/src/test_api.cpp @@ -1,15 +1,18 @@ #include "test_api.h" -#include "server_api.h" #include "cloud_point_rpc/rpc_server.hpp" -#include -#include -#include -#include -#include #include "server_api.h" +#include +#include +#include +#include +#include +#include +#include +#include + class TestThread { - std::string make_jsonrpc(const std::string& method_name) { + static std::string make_jsonrpc(const std::string &method_name) { return std::format(R"( {{ "jsonrpc": "2.0", @@ -17,67 +20,125 @@ class TestThread { "params": {{}}, "id": 1 }} - )", method_name); - } -public: - TestThread() = default; - ~TestThread() { - join(); + )", + method_name); } + public: + TestThread() = default; + ~TestThread() { join(); } + void routine() { + size_t distance{0}; std::unique_lock lock(mtx); - std::stop_token stoken = thr.get_stop_token(); + const std::stop_token stoken = thr.get_stop_token(); lock.unlock(); - while(!stoken.stop_requested()) { + while (!stoken.stop_requested()) { lock.lock(); - DLOG(INFO) << methods.size() << " ; Size of methods"; - if(!methods.empty()) { + if (!calls_queue.empty()) { + auto front = calls_queue.front(); + DLOG(INFO) << front << " will try to call"; + calls_queue.pop(); + LOG(INFO) << server.process(make_jsonrpc(front)); + } else if (state.load() && !methods.empty()) { auto it = methods.begin(); - std::advance(it, distance % methods.size()); + std::advance(it, distance++ % methods.size()); DLOG(INFO) << *it << " : Method at this position"; LOG(INFO) << server.process(make_jsonrpc(*it)); } - lock.unlock(); - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); + if (state.load() && calls_queue.empty()) + cv.wait_for(lock, thr_sleep, [&] { + return stoken.stop_requested(); + }); + + lock.unlock(); } LOG(INFO) << "Stopped"; } - void start() { - thr = std::jthread(&TestThread::routine, this); - } + + void start() { thr = std::jthread(&TestThread::routine, this); } void join() { - DLOG(INFO) << "Requested thread stop"; - thr.request_stop(); - if(thr.joinable()) { + if (thr.joinable()) { + DLOG(INFO) << "Requested thread stop"; + thr.request_stop(); + cv.notify_one(); thr.join(); } } - void add_method(callback_t cb, rpc_string* name) { + void add_method(const callback_t cb, rpc_string *name) { LOG(INFO) << "Trying to add method: " << name->s; std::lock_guard lock(mtx); - methods.emplace_back(name->s); - server.register_method(name->s,cb); + if (methods.contains(name->s)) { + LOG(INFO) << "Method already exists: " << name->s; + return; + } + methods.emplace(name->s); + server.register_method(name->s, cb); } - void call(rpc_string* name) { + int remove_method(const rpc_string *name) { + LOG(INFO) << "Trying to remove method: " << name->s; + std::lock_guard lock(mtx); + int result = 0; + auto it = std::find(methods.begin(), methods.end(), name->s); + if (it != methods.end()) { + methods.erase(it); + } else { + LOG(ERROR) << "Method not found: " << name->s; + result = -1; + } + return result; + } + + void call(const rpc_string *name) { std::lock_guard lock(mtx); LOG(INFO) << server.process(name->s); } -private: - std::list methods; - size_t distance; + + void set_duration(uint64_t duration_ms) { + LOG(INFO) << "Trying to install sleep duration: " << duration_ms; + std::lock_guard lock(mtx); + thr_sleep = std::chrono::milliseconds(duration_ms); + } + + uint64_t get_duration() { + std::lock_guard lock(mtx); + return thr_sleep.count(); + } + + void add_queue_call(const std::string &name) { + std::lock_guard lock(mtx); + calls_queue.emplace(name); + } + + void auto_call(bool state) { + this->state.store(state, std::memory_order_relaxed); + } + + void reset() { + std::lock_guard lock(mtx); + calls_queue = std::queue(); + methods.clear(); + state.store(true, std::memory_order_relaxed); + server = cloud_point_rpc::RpcServer(); + } + + private: + std::atomic state{true}; + std::condition_variable cv; + std::queue calls_queue{}; + std::set methods{}; std::jthread thr; std::mutex mtx; cloud_point_rpc::RpcServer server; + std::chrono::duration thr_sleep{50}; } test; extern "C" { void crpc_test_init() { - if(!google::IsGoogleLoggingInitialized()) + if (!google::IsGoogleLoggingInitialized()) google::InitGoogleLogging("TestRPC"); try { test.start(); @@ -88,10 +149,26 @@ void crpc_test_init() { void crpc_test_deinit() { test.join(); crpc_deinit(); + test.reset(); } -void crpc_test_add_method(callback_t cb, rpc_string* name){ +void crpc_test_add_method(callback_t cb, rpc_string *name) { test.add_method(cb, name); } +void crpc_test_change_duration(uint64_t duration_ms) { + test.set_duration(duration_ms); +} + +uint64_t crpc_test_duration() { return test.get_duration(); } + +int crpc_test_remove_method(rpc_string *name) { + return test.remove_method(name); +} + +void crpc_test_schedule_call(rpc_string *name) { test.add_queue_call(name->s); } + +void crpc_test_auto_call(uint32_t state) { + test.auto_call(static_cast(state)); +} } \ No newline at end of file diff --git a/tests/test_c_api.cpp b/tests/test_c_api.cpp index 10eb2d4..106d59a 100644 --- a/tests/test_c_api.cpp +++ b/tests/test_c_api.cpp @@ -1,45 +1,169 @@ +#include "cloud_point_rpc/rpc_server.hpp" +#include "server_api.h" +#include "test_api.h" #include +#include #include #include -#include -#include "cloud_point_rpc/rpc_server.hpp" -#include "test_api.h" -#include "server_api.h" + class TestCApi : public ::testing::Test { -protected: + protected: void SetUp() override { + FLAGS_logtostderr = true; + if (!google::IsGoogleLoggingInitialized()) + google::InitGoogleLogging("TestRPC"); EXPECT_NO_THROW(crpc_test_init()); } - void TearDown() override { - crpc_test_deinit(); - } + void TearDown() override { crpc_test_deinit(); } }; TEST_F(TestCApi, Base) { - FLAGS_logtostderr = 1; - rpc_string name; + + rpc_string name; name.s = "test"; static std::promise task; std::future called = task.get_future(); - LOG(INFO) << "Test"; - crpc_test_add_method(+[](rpc_string*)-> rpc_string* { - static bool installed = false; - DLOG(INFO) << "Trying to do something"; - if(!installed){ - LOG(INFO) << "Trying to install"; - installed = true; - task.set_value(installed); - } - DLOG(INFO) << "Go out"; - return crpc_str_create("res", sizeof("res")); - }, &name); + DLOG(INFO) << "Test"; + crpc_test_add_method( + +[](rpc_string *) -> rpc_string * { + static bool installed = false; + DLOG(INFO) << "Trying to do something"; + if (!installed) { + DLOG(INFO) << "Trying to install"; + installed = true; + task.set_value(installed); + } + DLOG(INFO) << "Go out"; + return crpc_str_create("res", sizeof("res")); + }, + &name); using namespace std::chrono_literals; - called.wait_for(500ms); - - EXPECT_EQ(called.valid(), true); + const std::future_status res = called.wait_for(500ms); + EXPECT_NE(res, std::future_status::timeout); EXPECT_EQ(called.get(), true); - LOG(INFO) << "DONE"; + DLOG(INFO) << "DONE"; +} + +TEST_F(TestCApi, AddedMultiple) { + constexpr int N = 4; + std::array, N> tasks; + std::array, rpc_string>, N> called; + + // The Bridge: A static pointer local to this test function + static std::array, N>* bridge; + bridge = &tasks; + + for (int i = 0; i < N; i++) { + called[i].first = tasks[i].get_future(); + std::string n = "test" + std::to_string(i); + called[i].second = rpc_string{n.c_str(), n.size()}; + } + + auto register_idx = [&]() { + crpc_test_add_method(+[](rpc_string*) -> rpc_string* { + static bool installed = false; + if (!installed) { + installed = true; + (*bridge)[I].set_value(true); + } + return crpc_str_create("res", sizeof("res")); + }, &called[I].second); + }; + + register_idx.template operator()<0>(); + register_idx.template operator()<1>(); + register_idx.template operator()<2>(); + register_idx.template operator()<3>(); + + auto test_idx = [&]() { + using namespace std::chrono_literals; + const std::future_status res = called[I].first.wait_for(1000ms); + EXPECT_NE(res, std::future_status::timeout); + EXPECT_EQ(called[I].first.get(), true); + LOG(INFO) << "Done" << "; task=" << I; + }; + + test_idx.template operator()<0>(); + test_idx.template operator()<1>(); + test_idx.template operator()<2>(); + test_idx.template operator()<3>(); +} + +TEST_F(TestCApi, RemoveMethod) { + rpc_string name{"test", sizeof("test") - 1}; + EXPECT_EQ(crpc_test_remove_method(&name), -1); +} + +TEST_F(TestCApi, ChangeDuration) { + constexpr int dur = 1e3; + EXPECT_NO_THROW(crpc_test_change_duration(dur)); + EXPECT_EQ(crpc_test_duration(), dur); + LOG(INFO) << "Test"; +} + +TEST_F(TestCApi, ScheduleCall) { + EXPECT_NO_THROW(crpc_test_auto_call(0)); + constexpr int N = 4; + std::array, N> tasks; + std::array, rpc_string>, N> called; + + // The Bridge: A static pointer local to this test function + static std::array, N>* bridge; + bridge = &tasks; + LOG(INFO) << "Started Schedule Call"; + for (int i = 0; i < N; i++) { + called[i].first = tasks[i].get_future(); + std::string n = "test" + std::to_string(i); + called[i].second = rpc_string{n.c_str(), n.size()}; + } + + auto register_idx = [&]() { + crpc_test_add_method(+[](rpc_string*) -> rpc_string* { + static bool installed = false; + if (!installed) { + installed = true; + (*bridge)[I].set_value(true); + } + return crpc_str_create("res", sizeof("res")); + }, &called[I].second); + }; + auto test_idx = [&]() { + using namespace std::chrono_literals; + const std::future_status res = called[I].first.wait_for(500ms); + EXPECT_NE(res, std::future_status::timeout); + EXPECT_EQ(called[I].first.get(), true); + }; + + auto schedule_call = []() { + rpc_string test{"test", sizeof("test") - 1}; + test.s += std::to_string(I); + LOG(INFO) << "Trying to add :" << test.s; + crpc_test_schedule_call(&test); + }; + + register_idx.template operator()<0>(); + register_idx.template operator()<1>(); + register_idx.template operator()<2>(); + register_idx.template operator()<3>(); + schedule_call.template operator()<0>(); + schedule_call.template operator()<1>(); + schedule_call.template operator()<2>(); + schedule_call.template operator()<3>(); + test_idx.template operator()<0>(); + test_idx.template operator()<1>(); + test_idx.template operator()<2>(); + test_idx.template operator()<3>(); +} + +TEST_F(TestCApi, String) { + rpc_string name; + name.s = "test"; + EXPECT_EQ(name.s.c_str(), crpc_str_get_data(&name)); + EXPECT_EQ(name.s.size(), crpc_str_get_size(&name)); + + auto creation = crpc_str_create("test 2222", sizeof("test 2222")); + EXPECT_NO_THROW(crpc_str_destroy(creation)); } \ No newline at end of file diff --git a/tests/test_cli.cpp b/tests/test_cli.cpp index 57f8236..e7897f0 100644 --- a/tests/test_cli.cpp +++ b/tests/test_cli.cpp @@ -12,21 +12,23 @@ using namespace cloud_point_rpc; class CliTest : public ::testing::Test { +public: + void start() { + tcp_server->start(); + } + protected: void SetUp() override { server_ip = "127.0.0.1"; server_port = 9096; rpc_server = std::make_unique(); - rpc_server->register_method( - "hello", [](const nlohmann::json &) { return "world"; }); tcp_server = std::make_unique( server_ip, server_port, [this](const std::string &req) { return rpc_server->process(req); }); - tcp_server->start(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -49,7 +51,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) { "get-intrinsic-params", [](const nlohmann::json &) { return std::vector{14589.0, 22489.0, 3123124.555}; }); - + this->start(); input << "1" << std::endl; input << "0" << std::endl;