diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..d2e97bf --- /dev/null +++ b/.clang-format @@ -0,0 +1,7 @@ +BasedOnStyle: LLVM +IndentWidth: 4 + +AlignConsecutiveAssignments: + Enabled: true + AcrossEmptyLines: true + AcrossComments: false \ No newline at end of file diff --git a/README.md b/README.md index 46dc8ba..6e33d42 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,77 @@ Communication JSON RPC protocol and implementation with Unity Scene. See [API.md](API.md) for detailed request/response formats. +## Comm model + +```plantuml +@startuml +box ClientProcess #LightBlue +Participant Caller +Participant CloudPointClient +Participant TCPClient +end box +box UnityProcess #LightGreen +Participant TCPServer +Participant CloudPointServer +Participant UnityWorld +end box +UnityWorld -> CloudPointServer : init thread +activate CloudPointServer +CloudPointServer -> TCPServer : await for connection +activate TCPServer +->CloudPointClient : init thread +activate CloudPointClient +CloudPointClient -> TCPClient : createConnection +TCPClient -> TCPServer : establish connection +TCPServer -> CloudPointServer : established +deactivate TCPServer +CloudPointServer -> TCPServer : await for calls + +TCPServer -> TCPServer : await for packet +Caller -> CloudPointClient : I want something +activate CloudPointClient +CloudPointClient -> CloudPointClient : CallMethod +CloudPointClient -> TCPClient : send(message) +activate TCPClient +TCPClient -> TCPServer : packet send +TCPServer -> TCPServer : await for packet +activate TCPServer +TCPServer -> TCPServer : read packet +TCPServer -> TCPClient : packet read +TCPClient -> CloudPointClient : done +deactivate TCPClient +CloudPointClient -> TCPClient : await for response +activate TCPClient +TCPClient -> TCPClient : await for packet +TCPServer -> CloudPointServer : callMethod +activate CloudPointServer +CloudPointServer -> UnityWorld : addToStaticQueue + +UnityWorld -> UnityWorld : read from queue +activate UnityWorld +UnityWorld -> UnityWorld : callMethod +UnityWorld -> CloudPointServer: set task return value +deactivate UnityWorld +CloudPointServer -> TCPServer : return task +deactivate CloudPointServer +TCPServer -> TCPClient : send response +TCPClient -> TCPServer : response read +TCPClient -> CloudPointClient : response received +TCPServer -> CloudPointServer : done +deactivate TCPServer +CloudPointClient -> Caller : here what you wanted +deactivate CloudPointClient + +Caller -> CloudPointClient : destruct +CloudPointClient -> TCPClient : finish waiting +deactivate TCPClient +deactivate CloudPointClient +UnityWorld -> CloudPointServer : destruct +deactivate CloudPointServer + +@enduml +``` + ## Development The project uses **Meson** build system and **C++20**. diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 7d95af4..0000000 --- a/config.yaml +++ /dev/null @@ -1,11 +0,0 @@ -server: - ip: "127.0.0.1" - port: 8085 - -test_data: - intrinsic_params: [1.1, 0.0, 0.0, 0.0, 1.1, 0.0, 0.0, 0.0, 1.0] - extrinsic_params: [1.0, 0.0, 0.0, 0.5, 0.0, 1.0, 0.0, 0.5, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0] - cloud_point: - - [0.1, 0.2, 0.3] - - [1.1, 1.2, 1.3] - - [5.5, 6.6, 7.7] diff --git a/include/cloud_point_rpc/cli.hpp b/include/cloud_point_rpc/cli.hpp index 614913c..60e0650 100644 --- a/include/cloud_point_rpc/cli.hpp +++ b/include/cloud_point_rpc/cli.hpp @@ -7,13 +7,14 @@ namespace cloud_point_rpc { /** * @brief Runs the CLI client. - * + * * @param input Input stream (usually std::cin) * @param output Output stream (usually std::cout) * @param ip Server IP * @param port Server Port * @return int exit code */ -int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port); +int run_cli(std::istream &input, std::ostream &output, const std::string &ip, + int port); } // namespace cloud_point_rpc diff --git a/include/cloud_point_rpc/config.hpp b/include/cloud_point_rpc/config.hpp index 21f2ada..cb26526 100644 --- a/include/cloud_point_rpc/config.hpp +++ b/include/cloud_point_rpc/config.hpp @@ -1,66 +1,72 @@ #pragma once +#include +#include #include #include #include -#include -#include namespace cloud_point_rpc { struct ServerConfig { - std::string ip; - int port; + std::string ip; + int port; }; struct TestData { - std::vector intrinsic_params; - std::vector extrinsic_params; - std::vector> cloud_point; + std::vector intrinsic_params; + std::vector extrinsic_params; + std::vector> cloud_point; }; struct Config { - ServerConfig server; - TestData test_data; + ServerConfig server; + TestData test_data; }; class ConfigLoader { - public: - static Config load(const std::string& path) { - try { - YAML::Node config = YAML::LoadFile(path); - Config c; - - // Server - if (config["server"]) { - c.server.ip = config["server"]["ip"].as("127.0.0.1"); - c.server.port = config["server"]["port"].as(8080); - } else { - c.server.ip = "127.0.0.1"; - c.server.port = 8080; - LOG(WARNING) << "No 'server' section, using defaults."; - } + public: + static Config load(const std::string &path) { + try { + YAML::Node config = YAML::LoadFile(path); + Config c; - // Test Data - if (config["test_data"]) { - c.test_data.intrinsic_params = config["test_data"]["intrinsic_params"].as>(); - c.test_data.extrinsic_params = config["test_data"]["extrinsic_params"].as>(); - - // Parse cloud_point (list of lists) - auto cp_node = config["test_data"]["cloud_point"]; - for (const auto& point_node : cp_node) { - c.test_data.cloud_point.push_back(point_node.as>()); + // Server + if (config["server"]) { + c.server.ip = + config["server"]["ip"].as("127.0.0.1"); + c.server.port = config["server"]["port"].as(8080); + } else { + c.server.ip = "127.0.0.1"; + c.server.port = 8080; + LOG(WARNING) << "No 'server' section, using defaults."; + } + + // Test Data + if (config["test_data"]) { + c.test_data.intrinsic_params = + config["test_data"]["intrinsic_params"] + .as>(); + c.test_data.extrinsic_params = + config["test_data"]["extrinsic_params"] + .as>(); + + // Parse cloud_point (list of lists) + auto cp_node = config["test_data"]["cloud_point"]; + for (const auto &point_node : cp_node) { + c.test_data.cloud_point.push_back( + point_node.as>()); + } + } else { + LOG(WARNING) << "No 'test_data' section, using empty/defaults."; + } + + return c; + } catch (const YAML::Exception &e) { + LOG(ERROR) << "Failed to load config: " << e.what(); + throw std::runtime_error("Config load failed"); } - } else { - LOG(WARNING) << "No 'test_data' section, using empty/defaults."; - } - - return c; - } catch (const YAML::Exception& e) { - LOG(ERROR) << "Failed to load config: " << e.what(); - throw std::runtime_error("Config load failed"); } - } }; } // namespace cloud_point_rpc diff --git a/include/cloud_point_rpc/rpc_client.hpp b/include/cloud_point_rpc/rpc_client.hpp index 0a23508..1dff2a7 100644 --- a/include/cloud_point_rpc/rpc_client.hpp +++ b/include/cloud_point_rpc/rpc_client.hpp @@ -1,82 +1,40 @@ #pragma once -#include -#include -#include -#include +#include "tcp_connector.hpp" #include -#include +#include +#include +#include +#include namespace cloud_point_rpc { -class RpcClient { - public: - RpcClient() : socket_(io_context_) {} +class RpcClient : public jsonrpccxx::JsonRpcClient { + public: + RpcClient(TCPConnector &connector) + : jsonrpccxx::JsonRpcClient(connector, jsonrpccxx::version::v2) {} - void connect(const std::string& ip, int port) { - try { - LOG(INFO) << "Client connecting to " << ip << ":" << port; - asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port); - socket_.connect(endpoint); - LOG(INFO) << "Client connected"; - } catch (const std::exception& e) { - throw std::runtime_error(std::string("Connection Failed: ") + e.what()); + [[nodiscard]] std::vector get_intrinsic_params() { + return call>("get-intrinsic-params"); } - } - [[nodiscard]] std::vector get_intrinsic_params() { - return call("get-intrinsic-params")["result"].get>(); - } - - [[nodiscard]] std::vector get_extrinsic_params() { - return call("get-extrinsic-params")["result"].get>(); - } - - [[nodiscard]] std::vector> get_cloud_point() { - return call("get-cloud-point")["result"].get>>(); - } - - [[nodiscard]] nlohmann::json call(const std::string& method, const nlohmann::json& params = nlohmann::json::object()) { - using json = nlohmann::json; - - // Create Request - json request = { - {"jsonrpc", "2.0"}, - {"method", method}, - {"params", params}, - {"id", ++id_counter_} - }; - std::string request_str = request.dump(); - - // Send - LOG(INFO) << "Client sending: " << request_str; - asio::write(socket_, asio::buffer(request_str)); - - // Read Response - LOG(INFO) << "Client reading response..."; - std::vector buffer(65536); - asio::error_code ec; - size_t len = socket_.read_some(asio::buffer(buffer), ec); - if (ec) throw std::system_error(ec); - - LOG(INFO) << "Client read " << len << " bytes"; - json response = json::parse(std::string(buffer.data(), len)); - - if (response.contains("error")) { - throw std::runtime_error(response["error"]["message"].get()); + [[nodiscard]] std::vector get_extrinsic_params() { + return call>("get-extrinsic-params"); } - - return response; - } - ~RpcClient() { - // Socket closes automatically - } + [[nodiscard]] std::vector> get_cloud_point() { + return call>>("get-cloud-point"); + } - private: - asio::io_context io_context_; - asio::ip::tcp::socket socket_; - int id_counter_ = 0; + template + [[nodiscard]] ReturnType call(std::string_view name) { + return this->CallMethod(id++, name.data()); + } + + ~RpcClient() = default; + + private: + int id{0}; }; } // namespace cloud_point_rpc diff --git a/include/cloud_point_rpc/rpc_server.hpp b/include/cloud_point_rpc/rpc_server.hpp index 285da62..349f7cc 100644 --- a/include/cloud_point_rpc/rpc_server.hpp +++ b/include/cloud_point_rpc/rpc_server.hpp @@ -9,13 +9,13 @@ namespace cloud_point_rpc { class RpcServer { -public: - using Handler = std::function; + public: + using Handler = std::function; - void register_method(const std::string& name, Handler handler); - [[nodiscard]] std::string process(const std::string& request_str); + void register_method(const std::string &name, Handler handler); + [[nodiscard]] std::string process(const std::string &request_str); -private: + private: std::map handlers_; }; diff --git a/include/cloud_point_rpc/serialize.hpp b/include/cloud_point_rpc/serialize.hpp new file mode 100644 index 0000000..ee19dd0 --- /dev/null +++ b/include/cloud_point_rpc/serialize.hpp @@ -0,0 +1,37 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace cloud_point_rpc { + +template +concept NumericType = requires(T param) { + requires std::is_integral_v || std::is_floating_point_v; + requires !std::is_same_v; + requires std::is_arithmetic_v; + requires !std::is_pointer_v; +}; + +template std::vector serialize(const T &data) { + std::vector buffer; + buffer.resize(sizeof(data)); + + std::memcpy(buffer.data(), &data, sizeof(data)); + + return buffer; +} + +inline void inplace_size_embedding(std::string &str) { + auto size = str.size(); + auto tmp = serialize(size); + str.insert(str.begin(), tmp.begin(), tmp.end()); +} + +template T deserialize(const std::vector &buffer) { + return *reinterpret_cast(buffer.data()); +} + +} // namespace cloud_point_rpc \ No newline at end of file diff --git a/include/cloud_point_rpc/service.hpp b/include/cloud_point_rpc/service.hpp index cd213e7..c8cea62 100644 --- a/include/cloud_point_rpc/service.hpp +++ b/include/cloud_point_rpc/service.hpp @@ -1,20 +1,20 @@ #pragma once -#include #include "cloud_point_rpc/config.hpp" +#include namespace cloud_point_rpc { class Service { - public: - explicit Service(const TestData& data = {}); + public: + explicit Service(const TestData &data = {}); - [[nodiscard]] std::vector get_intrinsic_params() const; - [[nodiscard]] std::vector get_extrinsic_params() const; - [[nodiscard]] std::vector> get_cloud_point() const; + [[nodiscard]] std::vector get_intrinsic_params() const; + [[nodiscard]] std::vector get_extrinsic_params() const; + [[nodiscard]] std::vector> get_cloud_point() const; - private: - TestData data_; + private: + TestData data_; }; } // namespace cloud_point_rpc diff --git a/include/cloud_point_rpc/tcp_connector.hpp b/include/cloud_point_rpc/tcp_connector.hpp new file mode 100644 index 0000000..e5286ef --- /dev/null +++ b/include/cloud_point_rpc/tcp_connector.hpp @@ -0,0 +1,80 @@ +#pragma once +#include "cloud_point_rpc/serialize.hpp" +#include "jsonrpccxx/iclientconnector.hpp" +#include +#include +#include + +namespace cloud_point_rpc { +/** + * TCPConnector main purpose is to implement jsonrpccxx::IClientConnector Send + * method As an internal implementation, TCPConnector adds to the beginning of + * the json "expected size" of a packet in case when the packet was for some + * reason on any level fractured. + */ +class TCPConnector : public jsonrpccxx::IClientConnector { + public: + TCPConnector(const std::string &ip, size_t port) noexcept(false) + : io_context_(), socket_(io_context_) { + try { + LOG(INFO) << "Client connecting to " << ip << ":" << port; + asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port); + socket_.connect(endpoint); + LOG(INFO) << "Client connected"; + } catch (const std::exception &e) { + throw std::runtime_error(std::string("Connection Failed: ") + + e.what()); + } + } + + std::string Send(const std::string &request) override { + // Send + LOG(INFO) << "Client sending: " << request + << " Size: " << request.size(); + auto remove_const = request; + inplace_size_embedding(remove_const); + asio::write(socket_, asio::buffer(remove_const)); + return read(); + } + + protected: + std::string read() { + std::string result; + std::array header; + LOG(INFO) << "trying to read"; + try { + size_t len = + asio::read(socket_, asio::buffer(header, header.size())); + if (len != sizeof(uint64_t)) { + LOG(ERROR) << "failed to read header"; + return result; + } + std::vector v(header.begin(), header.begin() + 8); + uint64_t packet_size = deserialize(v); + LOG(INFO) << "Received packet of size: " << packet_size; + + std::vector payload(packet_size); + len = asio::read(socket_, asio::buffer(payload)); + + // if failed to fetch in one pack, try to read until size + while (len < packet_size) { + len += asio::read(socket_, asio::buffer(payload.data() + len, + payload.size() - len)); + } + + LOG(INFO) << std::format("Was able to read len={}", len); + result = std::string(payload.begin(), payload.end()); + LOG(INFO) << "Payload: \n" << result; + } catch (const std::exception &e) { + LOG(WARNING) << "Connector handling error: " << e.what(); + } + + return result; + } + + private: + asio::io_context io_context_; + asio::ip::tcp::socket socket_; +}; + +} // namespace cloud_point_rpc \ No newline at end of file diff --git a/include/cloud_point_rpc/tcp_server.hpp b/include/cloud_point_rpc/tcp_server.hpp index 9b66b61..f9278d8 100644 --- a/include/cloud_point_rpc/tcp_server.hpp +++ b/include/cloud_point_rpc/tcp_server.hpp @@ -1,129 +1,147 @@ #pragma once -#include +#include "cloud_point_rpc/serialize.hpp" +#include +#include #include #include +#include #include -#include -#include namespace cloud_point_rpc { class TcpServer { - public: - using RequestProcessor = std::function; + public: + using RequestProcessor = std::function; - TcpServer(const std::string& ip, int port, RequestProcessor processor) - : ip_(ip), port_(port), processor_(std::move(processor)), - acceptor_(io_context_), running_(false) {} + TcpServer(const std::string &ip, int port, RequestProcessor processor) + : ip_(ip), port_(port), processor_(std::move(processor)), + acceptor_(io_context_), running_(false) {} - ~TcpServer() { stop(); } + ~TcpServer() { stop(); } - void start() { - try { - asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_); - acceptor_.open(endpoint.protocol()); - acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true)); - acceptor_.bind(endpoint); - acceptor_.listen(); + void start() { + try { + asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), + port_); + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true)); + acceptor_.bind(endpoint); + acceptor_.listen(); - running_ = true; - LOG(INFO) << "Server listening on " << ip_ << ":" << port_; + running_ = true; + LOG(INFO) << "Server listening on " << ip_ << ":" << port_; - accept_thread_ = std::jthread([this]() { - LOG(INFO) << "Accept thread started"; - while (running_) { - try { - auto socket = std::make_shared(io_context_); - acceptor_.accept(*socket); - - LOG(INFO) << "New connection from " << socket->remote_endpoint().address().to_string(); + accept_thread_ = std::jthread([this]() { + LOG(INFO) << "Accept thread started"; + while (running_) { + try { + auto socket = std::make_shared( + io_context_); + acceptor_.accept(*socket); - std::jthread([this, socket]() { - handle_client(socket); - }).detach(); - } catch (const std::system_error& e) { - LOG(INFO) << "Accept exception: " << e.what(); - if (running_) { - LOG(WARNING) << "Accept failed: " << e.what(); - } - } + LOG(INFO) + << "New connection from " + << socket->remote_endpoint().address().to_string(); + + std::jthread([this, socket]() { + handle_client(socket); + }).detach(); + } catch (const std::system_error &e) { + LOG(INFO) << "Accept exception: " << e.what(); + if (running_) { + LOG(WARNING) << "Accept failed: " << e.what(); + } + } + } + LOG(INFO) << "Accept thread exiting"; + }); + } catch (const std::exception &e) { + LOG(ERROR) << "Server start failed: " << e.what(); + throw; } - LOG(INFO) << "Accept thread exiting"; - }); - } catch (const std::exception& e) { - LOG(ERROR) << "Server start failed: " << e.what(); - throw; } - } - void stop() { - if (!running_) return; - LOG(INFO) << "Stopping server..."; - running_ = false; - // Closing acceptor unblocks accept() call usually, but sometimes we need to prod it - asio::error_code ec; - acceptor_.close(ec); - - // Ensure accept unblocks by connecting a dummy socket - try { - asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_); - asio::ip::tcp::socket dummy_sock(io_context_); - asio::error_code connect_ec; - dummy_sock.connect(endpoint, connect_ec); - } catch (...) { - // Ignore + void stop() { + if (!running_) + return; + LOG(INFO) << "Stopping server..."; + running_ = false; + // Closing acceptor unblocks accept() call usually, but sometimes we + // need to prod it + asio::error_code ec; + std::ignore = acceptor_.close(ec); + if (ec.value()) { + LOG(ERROR) << std::format( + "acceptor closed with a value returned = {}", ec.value()); + } + // Ensure accept unblocks by connecting a dummy socket + try { + asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), + port_); + asio::ip::tcp::socket dummy_sock(io_context_); + asio::error_code connect_ec; + std::ignore = dummy_sock.connect(endpoint, connect_ec); + } catch (...) { + // Ignore + } + LOG(INFO) << "Acceptor closed"; } - LOG(INFO) << "Acceptor closed"; - } - void join() { - if (accept_thread_.joinable()) { - LOG(INFO) << "Joining accept thread..."; - accept_thread_.join(); - LOG(INFO) << "Accept thread joined"; + void join() { + if (accept_thread_.joinable()) { + LOG(INFO) << "Joining accept thread..."; + accept_thread_.join(); + LOG(INFO) << "Accept thread joined"; + } } - } - private: - void handle_client(std::shared_ptr socket) { - try { - asio::streambuf buffer; - // Read until newline or EOF/error - // Note: This matches the client implementation which should send a newline - // However, previous implementation read 4096 bytes raw. - // Let's emulate "read some" to match previous simple behavior, or use read_until if we enforce framing. - // Given this is a prototype, let's read once. - - LOG(INFO) << "Server reading from client..."; - char data[4096]; - size_t length = socket->read_some(asio::buffer(data, 4096)); // Error will throw - LOG(INFO) << "Server read " << length << " bytes"; - - if (length > 0) { - std::string request(data, length); - LOG(INFO) << "Server processing request: " << request; - std::string response = processor_(request); - response += "\n"; - LOG(INFO) << "Server sending response: " << response; - asio::write(*socket, asio::buffer(response)); - LOG(INFO) << "Server sent response"; - } - } catch (const std::exception& e) { - LOG(WARNING) << "Client handling error: " << e.what(); + private: + void handle_client(std::shared_ptr socket) { + try { + LOG(INFO) << "Server reading from client..."; + + std::array header; + size_t header_length = asio::read(*socket, asio::buffer(header)); + if (header_length != 8) { + LOG(WARNING) << "Invalid header length: " << header_length; + return; + } + uint64_t packet_size = *reinterpret_cast(header.data()); + LOG(INFO) << "Expected packet size: " << packet_size; + + std::vector payload(packet_size); + size_t payload_length = asio::read(*socket, asio::buffer(payload)); + LOG(INFO) << "Read payload length: " << payload_length; + if (payload_length != packet_size) { + LOG(WARNING) << "Payload length mismatch: expected " + << packet_size << ", got " << payload_length; + return; + } + if (payload_length > 0) { + std::string request(payload.data(), payload_length); + LOG(INFO) << "Server processing request: " << request; + std::string response = processor_(request); + response += "\n"; + LOG(INFO) << "Server sending response: " << response; + inplace_size_embedding(response); + asio::write(*socket, asio::buffer(response)); + LOG(INFO) << "Server sent response"; + } + } catch (const std::exception &e) { + LOG(WARNING) << "Client handling error: " << e.what(); + } } - // Socket closes when shared_ptr dies - } - std::string ip_; - int port_; - RequestProcessor processor_; - - asio::io_context io_context_; - asio::ip::tcp::acceptor acceptor_; - - std::atomic running_; - std::jthread accept_thread_; + std::string ip_; + int port_; + RequestProcessor processor_; + + asio::io_context io_context_; + asio::ip::tcp::acceptor acceptor_; + + std::atomic running_; + std::jthread accept_thread_; }; } // namespace cloud_point_rpc diff --git a/src/cli.cpp b/src/cli.cpp index c7067a8..75668db 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -1,11 +1,11 @@ #include "cloud_point_rpc/cli.hpp" #include "cloud_point_rpc/rpc_client.hpp" #include -#include +#include namespace cloud_point_rpc { -void print_menu(std::ostream& output) { +void print_menu(std::ostream &output) { output << "\n=== Cloud Point RPC CLI ===" << std::endl; output << "1. get-intrinsic-params" << std::endl; output << "2. get-extrinsic-params" << std::endl; @@ -14,19 +14,39 @@ void print_menu(std::ostream& output) { output << "Select an option: "; } -int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port) { +template std::string vector_to_string(const std::vector &v) { + std::string result; + for (auto &el : v) { + result += std::to_string(el) + " "; + } + return result; +} + +template +std::string vector_to_string(const std::vector> &v) { + std::string result; + for (auto &el : v) { + result += vector_to_string(el) + "\n"; + } + return result; +} + +int run_cli(std::istream &input, std::ostream &output, const std::string &ip, + int port) { try { - RpcClient client; - client.connect(ip, port); - + TCPConnector connector(ip, port); + RpcClient client(connector); + output << "Connected to " << ip << ":" << port << std::endl; std::string choice; while (true) { print_menu(output); - if (!(input >> choice)) break; + if (!(input >> choice)) + break; - if (choice == "0") break; + if (choice == "0") + break; std::string method; if (choice == "1") { @@ -41,13 +61,24 @@ int run_cli(std::istream& input, std::ostream& output, const std::string& ip, in } try { - auto response = client.call(method); - output << "\nResponse:\n" << response.dump(4) << std::endl; - } catch (const std::exception& e) { + + if (method == "get-intrinsic-params") { + auto response = client.get_intrinsic_params(); + output << vector_to_string(response); + } + if (method == "get-extrinsic-params") { + auto response = client.get_extrinsic_params(); + output << vector_to_string(response); + } + if (method == "get-cloud-point") { + auto response = client.get_cloud_point(); + output << vector_to_string(response); + } + } catch (const std::exception &e) { output << "\nRPC Error: " << e.what() << std::endl; } } - } catch (const std::exception& e) { + } catch (const std::exception &e) { LOG(ERROR) << "CLI Error: " << e.what(); output << "Error: " << e.what() << std::endl; return 1; diff --git a/src/main.cpp b/src/main.cpp index f96ee19..3068468 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,12 +1,12 @@ -#include -#include #include "cloud_point_rpc/cli.hpp" #include "cloud_point_rpc/config.hpp" +#include #include +#include -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); - FLAGS_logtostderr = 1; + FLAGS_logtostderr = 1; std::string config_path = "config.yaml"; if (argc > 1) { @@ -17,15 +17,17 @@ int main(int argc, char* argv[]) { std::ifstream f(config_path.c_str()); if (!f.good()) { std::cerr << "Config file not found: " << config_path << std::endl; - std::cerr << "Please create config.yaml or provide path as argument." << std::endl; + std::cerr << "Please create config.yaml or provide path as argument." + << std::endl; return 1; } f.close(); try { auto config = cloud_point_rpc::ConfigLoader::load(config_path); - return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip, config.server.port); - } catch (const std::exception& e) { + return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip, + config.server.port); + } catch (const std::exception &e) { std::cerr << "Failed to start CLI: " << e.what() << std::endl; return 1; } diff --git a/src/rpc_server.cpp b/src/rpc_server.cpp index 4282d3f..477af79 100644 --- a/src/rpc_server.cpp +++ b/src/rpc_server.cpp @@ -1,61 +1,59 @@ #include "cloud_point_rpc/rpc_server.hpp" -#include - +#include using json = nlohmann::json; namespace cloud_point_rpc { namespace { -json create_error(int code, const std::string& message, const json& id = nullptr) { - return { - {"jsonrpc", "2.0"}, - {"error", {{"code", code}, {"message", message}}}, - {"id", id} - }; +json create_error(int code, const std::string &message, + const json &id = nullptr) { + return {{"jsonrpc", "2.0"}, + {"error", {{"code", code}, {"message", message}}}, + {"id", id}}; } -json create_success(const json& result, const json& id) { - return { - {"jsonrpc", "2.0"}, - {"result", result}, - {"id", id} - }; +json create_success(const json &result, const json &id) { + return {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}}; } } // namespace -void RpcServer::register_method(const std::string& name, Handler handler) { - handlers_[name] = std::move(handler); +void RpcServer::register_method(const std::string &name, Handler handler) { + handlers_[name] = std::move(handler); } -std::string RpcServer::process(const std::string& request_str) { - json request; - try { - request = json::parse(request_str); - } catch (const json::parse_error&) { - return create_error(-32700, "Parse error").dump(); - } +std::string RpcServer::process(const std::string &request_str) { + json request; + try { + request = json::parse(request_str); + } catch (const json::parse_error &) { + LOG(ERROR) << "json parse error" << __func__; + return create_error(-32700, "Parse error").dump(); + } - // Batch requests are not supported in this minimal version, assume single object - if (!request.contains("jsonrpc") || request["jsonrpc"] != "2.0" || - !request.contains("method") || !request.contains("id")) { - return create_error(-32600, "Invalid Request", request.value("id", json(nullptr))).dump(); - } + // Batch requests are not supported in this minimal version, assume single + // object + if (!request.contains("jsonrpc") || request["jsonrpc"] != "2.0" || + !request.contains("method") || !request.contains("id")) { + return create_error(-32600, "Invalid Request", + request.value("id", json(nullptr))) + .dump(); + } - std::string method = request["method"]; - json id = request["id"]; - json params = request.value("params", json::object()); + std::string method = request["method"]; + json id = request["id"]; + json params = request.value("params", json::object()); - auto it = handlers_.find(method); - if (it == handlers_.end()) { - return create_error(-32601, "Method not found", id).dump(); - } + auto it = handlers_.find(method); + if (it == handlers_.end()) { + return create_error(-32601, "Method not found", id).dump(); + } - try { - json result = it->second(params); - return create_success(result, id).dump(); - } catch (const std::exception& e) { - return create_error(-32000, e.what(), id).dump(); // Server error - } + try { + json result = it->second(params); + return create_success(result, id).dump(); + } catch (const std::exception &e) { + return create_error(-32000, e.what(), id).dump(); // Server error + } } } // namespace cloud_point_rpc diff --git a/src/server_main.cpp b/src/server_main.cpp index 0cf1bbf..7ff4c9f 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -1,58 +1,57 @@ -#include -#include -#include +#include "cloud_point_rpc/config.hpp" #include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/service.hpp" -#include "cloud_point_rpc/config.hpp" #include "cloud_point_rpc/tcp_server.hpp" +#include +#include using json = nlohmann::json; -int main(int argc, char* argv[]) { - google::InitGoogleLogging(argv[0]); - google::InstallFailureSignalHandler(); - FLAGS_alsologtostderr = 1; +int main(int argc, char *argv[]) { + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_alsologtostderr = 1; - std::string config_path = "config.yaml"; - if (argc > 1) { - config_path = argv[1]; - } + std::string config_path = "config.yaml"; + if (argc > 1) { + config_path = argv[1]; + } - LOG(INFO) << "Starting Cloud Point RPC Server (Test Mock)..."; + LOG(INFO) << "Starting Cloud Point RPC Server (Test Mock)..."; - try { - auto config = cloud_point_rpc::ConfigLoader::load(config_path); - LOG(INFO) << "Loaded config from " << config_path; + try { + auto config = cloud_point_rpc::ConfigLoader::load(config_path); + LOG(INFO) << "Loaded config from " << config_path; - // Inject test data into service - cloud_point_rpc::Service service(config.test_data); - cloud_point_rpc::RpcServer rpc_server; + // Inject test data into service + cloud_point_rpc::Service service(config.test_data); + cloud_point_rpc::RpcServer rpc_server; - rpc_server.register_method("get-intrinsic-params", [&](const json&) { - return service.get_intrinsic_params(); - }); + rpc_server.register_method("get-intrinsic-params", [&](const json &) { + return service.get_intrinsic_params(); + }); - rpc_server.register_method("get-extrinsic-params", [&](const json&) { - return service.get_extrinsic_params(); - }); + rpc_server.register_method("get-extrinsic-params", [&](const json &) { + return service.get_extrinsic_params(); + }); - rpc_server.register_method("get-cloud-point", [&](const json&) { - return service.get_cloud_point(); - }); + rpc_server.register_method("get-cloud-point", [&](const json &) { + return service.get_cloud_point(); + }); - cloud_point_rpc::TcpServer server(config.server.ip, config.server.port, - [&](const std::string& request) { - return rpc_server.process(request); - } - ); + cloud_point_rpc::TcpServer server(config.server.ip, config.server.port, + [&](const std::string &request) { + return rpc_server.process( + request); + }); - server.start(); - server.join(); + server.start(); + server.join(); - } catch (const std::exception& e) { - LOG(ERROR) << "Fatal error: " << e.what(); - return 1; - } + } catch (const std::exception &e) { + LOG(ERROR) << "Fatal error: " << e.what(); + return 1; + } - return 0; + return 0; } diff --git a/src/service.cpp b/src/service.cpp index c976ddf..e88cb82 100644 --- a/src/service.cpp +++ b/src/service.cpp @@ -2,37 +2,29 @@ namespace cloud_point_rpc { -Service::Service(const TestData& data) : data_(data) {} +Service::Service(const TestData &data) : data_(data) {} std::vector Service::get_intrinsic_params() const { - if (data_.intrinsic_params.empty()) { - // Fallback if no config loaded - return {1.0, 0.0, 0.0, - 0.0, 1.0, 0.0, - 0.0, 0.0, 1.0}; - } - return data_.intrinsic_params; + if (data_.intrinsic_params.empty()) { + // Fallback if no config loaded + return {1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0}; + } + return data_.intrinsic_params; } std::vector Service::get_extrinsic_params() const { - if (data_.extrinsic_params.empty()) { - return {1.0, 0.0, 0.0, 0.0, - 0.0, 1.0, 0.0, 0.0, - 0.0, 0.0, 1.0, 0.0, - 0.0, 0.0, 0.0, 1.0}; - } - return data_.extrinsic_params; + if (data_.extrinsic_params.empty()) { + return {1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, + 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0}; + } + return data_.extrinsic_params; } std::vector> Service::get_cloud_point() const { - if (data_.cloud_point.empty()) { - return { - {0.1, 0.2, 0.3}, - {1.1, 1.2, 1.3}, - {2.1, 2.2, 2.3} - }; - } - return data_.cloud_point; + if (data_.cloud_point.empty()) { + return {{0.1, 0.2, 0.3}, {1.1, 1.2, 1.3}, {2.1, 2.2, 2.3}}; + } + return data_.cloud_point; } } // namespace cloud_point_rpc diff --git a/tests/meson.build b/tests/meson.build index fd6ab1c..5356e95 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,7 +1,8 @@ test_sources = files( 'test_rpc.cpp', 'test_integration.cpp', - 'test_cli.cpp' + 'test_cli.cpp', + 'test_tcp.cpp' ) test_exe = executable('unit_tests', diff --git a/tests/test_cli.cpp b/tests/test_cli.cpp index 38540c0..57f8236 100644 --- a/tests/test_cli.cpp +++ b/tests/test_cli.cpp @@ -1,39 +1,36 @@ -#include -#include -#include #include -#include +#include +#include #include -#include +#include +#include -#include "cloud_point_rpc/tcp_server.hpp" -#include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/cli.hpp" +#include "cloud_point_rpc/rpc_server.hpp" +#include "cloud_point_rpc/tcp_server.hpp" using namespace cloud_point_rpc; class CliTest : public ::testing::Test { -protected: + protected: void SetUp() override { - server_ip = "127.0.0.1"; + server_ip = "127.0.0.1"; server_port = 9096; - rpc_server = std::make_unique(); - rpc_server->register_method("hello", [](const nlohmann::json& params) { - return "world"; - }); + rpc_server = std::make_unique(); + rpc_server->register_method( + "hello", [](const nlohmann::json &) { return "world"; }); - tcp_server = std::make_unique(server_ip, server_port, [this](const std::string& req) { - return rpc_server->process(req); - }); + tcp_server = std::make_unique( + server_ip, server_port, [this](const std::string &req) { + return rpc_server->process(req); + }); tcp_server->start(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - void TearDown() override { - tcp_server->stop(); - } + void TearDown() override { tcp_server->stop(); } std::string server_ip; int server_port; @@ -48,9 +45,10 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) { // Select option 1 (get-intrinsic-params) then 0 (exit) // First we need to make sure the rpc_server has the method registered. // Our SetUp registers "hello", let's register "get-intrinsic-params" too. - rpc_server->register_method("get-intrinsic-params", [](const nlohmann::json&) { - return std::vector{1.0, 2.0, 3.0}; - }); + rpc_server->register_method( + "get-intrinsic-params", [](const nlohmann::json &) { + return std::vector{14589.0, 22489.0, 3123124.555}; + }); input << "1" << std::endl; input << "0" << std::endl; @@ -60,8 +58,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) { EXPECT_EQ(result, 0); std::string response = output.str(); // Use more flexible check because of pretty printing - EXPECT_THAT(response, ::testing::HasSubstr("\"result\": [")); - EXPECT_THAT(response, ::testing::HasSubstr("1.0")); - EXPECT_THAT(response, ::testing::HasSubstr("2.0")); - EXPECT_THAT(response, ::testing::HasSubstr("3.0")); + EXPECT_THAT(response, ::testing::HasSubstr("14589.0")); + EXPECT_THAT(response, ::testing::HasSubstr("22489.0")); + EXPECT_THAT(response, ::testing::HasSubstr("3123124.555")); } diff --git a/tests/test_integration.cpp b/tests/test_integration.cpp index b21c960..da418fc 100644 --- a/tests/test_integration.cpp +++ b/tests/test_integration.cpp @@ -1,100 +1,97 @@ -#include -#include -#include #include +#include +#include +#include #include "cloud_point_rpc/config.hpp" +#include "cloud_point_rpc/rpc_client.hpp" #include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/service.hpp" #include "cloud_point_rpc/tcp_server.hpp" -#include "cloud_point_rpc/rpc_client.hpp" #include using namespace cloud_point_rpc; class IntegrationTest : public ::testing::Test { - protected: - void SetUp() override { - // Create a temporary config file for testing - std::ofstream config_file("config.yaml"); - config_file << "server:\n" - << " ip: \"127.0.0.1\"\n" - << " port: 9095\n" - << "test_data:\n" - << " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]\n" - << " extrinsic_params: [1,0,0,0, 0,1,0,0, 0,0,1,0, 0,0,0,1]\n" - << " cloud_point:\n" - << " - [0.1, 0.2, 0.3]\n"; - config_file.close(); + protected: + void SetUp() override { + // Create a temporary config file for testing + std::ofstream config_file("config.yaml"); + config_file + << "server:\n" + << " ip: \"127.0.0.1\"\n" + << " port: 9095\n" + << "test_data:\n" + << " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, " + "9.0]\n" + << " extrinsic_params: [1,0,0,0, 0,1,0,0, 0,0,1,0, 0,0,0,1]\n" + << " cloud_point:\n" + << " - [0.1, 0.2, 0.3]\n"; + config_file.close(); - // Setup Mock Server - try { - config_ = ConfigLoader::load("config.yaml"); - } catch (...) { - // If config fails, we can't proceed, but we should avoid crashing in TearDown - throw; - } - - service_ = std::make_unique(config_.test_data); - rpc_server_ = std::make_unique(); - - rpc_server_->register_method("get-intrinsic-params", [&](const nlohmann::json&) { - return service_->get_intrinsic_params(); - }); - - // Start Server Thread - tcp_server_ = std::make_unique(config_.server.ip, config_.server.port, - [&](const std::string& req) { - return rpc_server_->process(req); + // Setup Mock Server + try { + config_ = ConfigLoader::load("config.yaml"); + } catch (...) { + // If config fails, we can't proceed, but we should avoid crashing + // in TearDown + throw; } - ); - - server_thread_ = std::thread([&]() { - tcp_server_->start(); - }); - // Give server time to start - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } + service_ = std::make_unique(config_.test_data); + rpc_server_ = std::make_unique(); - void TearDown() override { - if (tcp_server_) { - tcp_server_->stop(); + rpc_server_->register_method( + "get-intrinsic-params", [&](const nlohmann::json &) { + return service_->get_intrinsic_params(); + }); + + // Start Server Thread + tcp_server_ = std::make_unique( + config_.server.ip, config_.server.port, + [&](const std::string &req) { return rpc_server_->process(req); }); + + server_thread_ = std::thread([&]() { tcp_server_->start(); }); + + // Give server time to start + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } - if (server_thread_.joinable()) { - server_thread_.join(); - } - // Clean up - std::remove("config.yaml"); - } - Config config_; - std::unique_ptr service_; - std::unique_ptr rpc_server_; - std::unique_ptr tcp_server_; - std::thread server_thread_; + void TearDown() override { + if (tcp_server_) { + tcp_server_->stop(); + } + if (server_thread_.joinable()) { + server_thread_.join(); + } + // Clean up + std::remove("config.yaml"); + } + + Config config_; + std::unique_ptr service_; + std::unique_ptr rpc_server_; + std::unique_ptr tcp_server_; + std::thread server_thread_; }; TEST_F(IntegrationTest, ClientCanConnectAndRetrieveValues) { - RpcClient client; - - // Act: Connect - ASSERT_NO_THROW(client.connect(config_.server.ip, config_.server.port)); + TCPConnector connector(config_.server.ip, config_.server.port); + RpcClient client(connector); - // Act: Call Method - auto params = client.get_intrinsic_params(); + // Act: Call Method + std::vector params; + EXPECT_NO_THROW(params = client.get_intrinsic_params()); - // Assert: Values match config - const auto& expected = config_.test_data.intrinsic_params; - ASSERT_EQ(params.size(), expected.size()); - for (size_t i = 0; i < params.size(); ++i) { - EXPECT_DOUBLE_EQ(params[i], expected[i]); - } + // Assert: Values match config + const auto &expected = config_.test_data.intrinsic_params; + ASSERT_EQ(params.size(), expected.size()); + for (size_t i = 0; i < params.size(); ++i) { + EXPECT_DOUBLE_EQ(params[i], expected[i]); + } } TEST_F(IntegrationTest, ClientHandlesConnectionError) { - RpcClient client; - // Try connecting to a closed port - EXPECT_THROW(client.connect("127.0.0.1", 9999), std::runtime_error); + EXPECT_THROW(TCPConnector connector("127.0.0.1", 9999), std::runtime_error); } diff --git a/tests/test_rpc.cpp b/tests/test_rpc.cpp index b687fe3..979b0fe 100644 --- a/tests/test_rpc.cpp +++ b/tests/test_rpc.cpp @@ -1,61 +1,63 @@ -#include -#include -#include -#include #include "cloud_point_rpc/rpc_server.hpp" #include "cloud_point_rpc/service.hpp" +#include +#include #include +#include +#include using json = nlohmann::json; using namespace cloud_point_rpc; class RpcServerTest : public ::testing::Test { - protected: - RpcServer server; - Service service; + protected: + RpcServer server; + Service service; - void SetUp() override { - server.register_method("get-intrinsic-params", [&](const json&) { - return service.get_intrinsic_params(); - }); - } + void SetUp() override { + server.register_method("get-intrinsic-params", [&](const json &) { + return service.get_intrinsic_params(); + }); + } }; TEST_F(RpcServerTest, GetIntrinsicParamsReturnsMatrix) { - std::string request = R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})"; - std::string response_str = server.process(request); - - json response = json::parse(response_str); - - ASSERT_EQ(response["jsonrpc"], "2.0"); - ASSERT_EQ(response["id"], 1); - ASSERT_TRUE(response.contains("result")); - - auto result = response["result"].get>(); - EXPECT_EQ(result.size(), 9); - // Verify Identity Matrix - EXPECT_EQ(result[0], 1.0); - EXPECT_EQ(result[4], 1.0); - EXPECT_EQ(result[8], 1.0); + std::string request = + R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})"; + std::string response_str = server.process(request); + + json response = json::parse(response_str); + + ASSERT_EQ(response["jsonrpc"], "2.0"); + ASSERT_EQ(response["id"], 1); + ASSERT_TRUE(response.contains("result")); + + auto result = response["result"].get>(); + EXPECT_EQ(result.size(), 9); + // Verify Identity Matrix + EXPECT_EQ(result[0], 1.0); + EXPECT_EQ(result[4], 1.0); + EXPECT_EQ(result[8], 1.0); } TEST_F(RpcServerTest, MethodNotFoundReturnsError) { - std::string request = R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})"; - std::string response_str = server.process(request); - - json response = json::parse(response_str); - - ASSERT_TRUE(response.contains("error")); - EXPECT_EQ(response["error"]["code"], -32601); - EXPECT_EQ(response["error"]["message"], "Method not found"); + std::string request = + R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})"; + std::string response_str = server.process(request); + + json response = json::parse(response_str); + + ASSERT_TRUE(response.contains("error")); + EXPECT_EQ(response["error"]["code"], -32601); + EXPECT_EQ(response["error"]["message"], "Method not found"); } TEST_F(RpcServerTest, InvalidJsonReturnsParseError) { - std::string request = R"({"jsonrpc": "2.0", "method": "broken-json...)"; - std::string response_str = server.process(request); - - json response = json::parse(response_str); - - ASSERT_TRUE(response.contains("error")); - EXPECT_EQ(response["error"]["code"], -32700); + std::string request = R"({"jsonrpc": "2.0", "method": "broken-json...)"; + std::string response_str = server.process(request); + + json response = json::parse(response_str); + + ASSERT_TRUE(response.contains("error")); + EXPECT_EQ(response["error"]["code"], -32700); } diff --git a/tests/test_tcp.cpp b/tests/test_tcp.cpp new file mode 100644 index 0000000..ba4fd9d --- /dev/null +++ b/tests/test_tcp.cpp @@ -0,0 +1,58 @@ +#include "cloud_point_rpc/serialize.hpp" +#include "cloud_point_rpc/tcp_connector.hpp" +#include "cloud_point_rpc/tcp_server.hpp" +#include +#include +#include +#include +#include + +class TcpTest : public ::testing::Test { + public: + void ExpectedResponse(const std::string &expected_response) { + expected_ = expected_response; + } + + protected: + void SetUp() override { + server_ = std::make_unique( + "127.0.0.1", 12345, [this](const std::string &request) { + EXPECT_EQ(request, expected_); + std::string msg = "Echo: " + request; + auto v = cloud_point_rpc::serialize(msg.length()); + std::string res(v.begin(), v.end()); + res += msg; + return res; + }); + server_->start(); + } + + void TearDown() override { + server_->stop(); + server_.reset(); + } + + std::string expected_; + std::unique_ptr server_; +}; + +TEST(SerializeTest, Base) { + uint64_t value{123}; + auto res = cloud_point_rpc::serialize(value); + EXPECT_EQ(value, cloud_point_rpc::deserialize(res)); +} + +TEST_F(TcpTest, EchoTest) { + constexpr std::string_view msg = "Hello, TCP Server!"; + ExpectedResponse(msg.data()); + cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345); + auto res = connector.Send(msg.data()); +} + +TEST_F(TcpTest, HugeBuffer) { + static constexpr uint64_t w = 1920, h = 1080, c = 3; + std::string data(w * h * c, 77); + ExpectedResponse(data); + cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345); + auto res = connector.Send(data); +} \ No newline at end of file