[jsonrpccxx] moved to external rpc impl

Detailed:
  As base used jsonrpccxx implementation paired with TCP socket
  TCP socket updated to handle dynamic sized buffers
  TCP communication protocol changed to serialized packet size after
  which json string is presented
This commit is contained in:
Artur Mukhamadiev 2026-02-07 15:43:12 +03:00 committed by Artur Mukhamadiev
parent 4ddaea91a7
commit 886e3f1879
21 changed files with 746 additions and 502 deletions

7
.clang-format Normal file
View File

@ -0,0 +1,7 @@
BasedOnStyle: LLVM
IndentWidth: 4
AlignConsecutiveAssignments:
Enabled: true
AcrossEmptyLines: true
AcrossComments: false

View File

@ -6,6 +6,77 @@ Communication JSON RPC protocol and implementation with Unity Scene.
See [API.md](API.md) for detailed request/response formats.
## Comm model
```plantuml
@startuml
box ClientProcess #LightBlue
Participant Caller
Participant CloudPointClient
Participant TCPClient
end box
box UnityProcess #LightGreen
Participant TCPServer
Participant CloudPointServer
Participant UnityWorld
end box
UnityWorld -> CloudPointServer : init thread
activate CloudPointServer
CloudPointServer -> TCPServer : await for connection
activate TCPServer
->CloudPointClient : init thread
activate CloudPointClient
CloudPointClient -> TCPClient : createConnection
TCPClient -> TCPServer : establish connection
TCPServer -> CloudPointServer : established
deactivate TCPServer
CloudPointServer -> TCPServer : await for calls
TCPServer -> TCPServer : await for packet
Caller -> CloudPointClient : I want something
activate CloudPointClient
CloudPointClient -> CloudPointClient : CallMethod<Something>
CloudPointClient -> TCPClient : send(message)
activate TCPClient
TCPClient -> TCPServer : packet send
TCPServer -> TCPServer : await for packet
activate TCPServer
TCPServer -> TCPServer : read packet
TCPServer -> TCPClient : packet read
TCPClient -> CloudPointClient : done
deactivate TCPClient
CloudPointClient -> TCPClient : await for response
activate TCPClient
TCPClient -> TCPClient : await for packet
TCPServer -> CloudPointServer : callMethod
activate CloudPointServer
CloudPointServer -> UnityWorld : addToStaticQueue
UnityWorld -> UnityWorld : read from queue
activate UnityWorld
UnityWorld -> UnityWorld : callMethod
UnityWorld -> CloudPointServer: set task return value
deactivate UnityWorld
CloudPointServer -> TCPServer : return task
deactivate CloudPointServer
TCPServer -> TCPClient : send response
TCPClient -> TCPServer : response read
TCPClient -> CloudPointClient : response received
TCPServer -> CloudPointServer : done
deactivate TCPServer
CloudPointClient -> Caller : here what you wanted
deactivate CloudPointClient
Caller -> CloudPointClient : destruct
CloudPointClient -> TCPClient : finish waiting
deactivate TCPClient
deactivate CloudPointClient
UnityWorld -> CloudPointServer : destruct
deactivate CloudPointServer
@enduml
```
## Development
The project uses **Meson** build system and **C++20**.

View File

@ -1,11 +0,0 @@
server:
ip: "127.0.0.1"
port: 8085
test_data:
intrinsic_params: [1.1, 0.0, 0.0, 0.0, 1.1, 0.0, 0.0, 0.0, 1.0]
extrinsic_params: [1.0, 0.0, 0.0, 0.5, 0.0, 1.0, 0.0, 0.5, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0]
cloud_point:
- [0.1, 0.2, 0.3]
- [1.1, 1.2, 1.3]
- [5.5, 6.6, 7.7]

View File

@ -7,13 +7,14 @@ namespace cloud_point_rpc {
/**
* @brief Runs the CLI client.
*
*
* @param input Input stream (usually std::cin)
* @param output Output stream (usually std::cout)
* @param ip Server IP
* @param port Server Port
* @return int exit code
*/
int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port);
int run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int port);
} // namespace cloud_point_rpc

View File

@ -1,66 +1,72 @@
#pragma once
#include <glog/logging.h>
#include <stdexcept>
#include <string>
#include <vector>
#include <yaml-cpp/yaml.h>
#include <glog/logging.h>
#include <stdexcept>
namespace cloud_point_rpc {
struct ServerConfig {
std::string ip;
int port;
std::string ip;
int port;
};
struct TestData {
std::vector<double> intrinsic_params;
std::vector<double> extrinsic_params;
std::vector<std::vector<double>> cloud_point;
std::vector<double> intrinsic_params;
std::vector<double> extrinsic_params;
std::vector<std::vector<double>> cloud_point;
};
struct Config {
ServerConfig server;
TestData test_data;
ServerConfig server;
TestData test_data;
};
class ConfigLoader {
public:
static Config load(const std::string& path) {
try {
YAML::Node config = YAML::LoadFile(path);
Config c;
// Server
if (config["server"]) {
c.server.ip = config["server"]["ip"].as<std::string>("127.0.0.1");
c.server.port = config["server"]["port"].as<int>(8080);
} else {
c.server.ip = "127.0.0.1";
c.server.port = 8080;
LOG(WARNING) << "No 'server' section, using defaults.";
}
public:
static Config load(const std::string &path) {
try {
YAML::Node config = YAML::LoadFile(path);
Config c;
// Test Data
if (config["test_data"]) {
c.test_data.intrinsic_params = config["test_data"]["intrinsic_params"].as<std::vector<double>>();
c.test_data.extrinsic_params = config["test_data"]["extrinsic_params"].as<std::vector<double>>();
// Parse cloud_point (list of lists)
auto cp_node = config["test_data"]["cloud_point"];
for (const auto& point_node : cp_node) {
c.test_data.cloud_point.push_back(point_node.as<std::vector<double>>());
// Server
if (config["server"]) {
c.server.ip =
config["server"]["ip"].as<std::string>("127.0.0.1");
c.server.port = config["server"]["port"].as<int>(8080);
} else {
c.server.ip = "127.0.0.1";
c.server.port = 8080;
LOG(WARNING) << "No 'server' section, using defaults.";
}
// Test Data
if (config["test_data"]) {
c.test_data.intrinsic_params =
config["test_data"]["intrinsic_params"]
.as<std::vector<double>>();
c.test_data.extrinsic_params =
config["test_data"]["extrinsic_params"]
.as<std::vector<double>>();
// Parse cloud_point (list of lists)
auto cp_node = config["test_data"]["cloud_point"];
for (const auto &point_node : cp_node) {
c.test_data.cloud_point.push_back(
point_node.as<std::vector<double>>());
}
} else {
LOG(WARNING) << "No 'test_data' section, using empty/defaults.";
}
return c;
} catch (const YAML::Exception &e) {
LOG(ERROR) << "Failed to load config: " << e.what();
throw std::runtime_error("Config load failed");
}
} else {
LOG(WARNING) << "No 'test_data' section, using empty/defaults.";
}
return c;
} catch (const YAML::Exception& e) {
LOG(ERROR) << "Failed to load config: " << e.what();
throw std::runtime_error("Config load failed");
}
}
};
} // namespace cloud_point_rpc

View File

@ -1,82 +1,40 @@
#pragma once
#include <glog/logging.h>
#include <string>
#include <vector>
#include <nlohmann/json.hpp>
#include "tcp_connector.hpp"
#include <asio.hpp>
#include <stdexcept>
#include <glog/logging.h>
#include <jsonrpccxx/client.hpp>
#include <nlohmann/json.hpp>
#include <vector>
namespace cloud_point_rpc {
class RpcClient {
public:
RpcClient() : socket_(io_context_) {}
class RpcClient : public jsonrpccxx::JsonRpcClient {
public:
RpcClient(TCPConnector &connector)
: jsonrpccxx::JsonRpcClient(connector, jsonrpccxx::version::v2) {}
void connect(const std::string& ip, int port) {
try {
LOG(INFO) << "Client connecting to " << ip << ":" << port;
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port);
socket_.connect(endpoint);
LOG(INFO) << "Client connected";
} catch (const std::exception& e) {
throw std::runtime_error(std::string("Connection Failed: ") + e.what());
[[nodiscard]] std::vector<double> get_intrinsic_params() {
return call<std::vector<double>>("get-intrinsic-params");
}
}
[[nodiscard]] std::vector<double> get_intrinsic_params() {
return call("get-intrinsic-params")["result"].get<std::vector<double>>();
}
[[nodiscard]] std::vector<double> get_extrinsic_params() {
return call("get-extrinsic-params")["result"].get<std::vector<double>>();
}
[[nodiscard]] std::vector<std::vector<double>> get_cloud_point() {
return call("get-cloud-point")["result"].get<std::vector<std::vector<double>>>();
}
[[nodiscard]] nlohmann::json call(const std::string& method, const nlohmann::json& params = nlohmann::json::object()) {
using json = nlohmann::json;
// Create Request
json request = {
{"jsonrpc", "2.0"},
{"method", method},
{"params", params},
{"id", ++id_counter_}
};
std::string request_str = request.dump();
// Send
LOG(INFO) << "Client sending: " << request_str;
asio::write(socket_, asio::buffer(request_str));
// Read Response
LOG(INFO) << "Client reading response...";
std::vector<char> buffer(65536);
asio::error_code ec;
size_t len = socket_.read_some(asio::buffer(buffer), ec);
if (ec) throw std::system_error(ec);
LOG(INFO) << "Client read " << len << " bytes";
json response = json::parse(std::string(buffer.data(), len));
if (response.contains("error")) {
throw std::runtime_error(response["error"]["message"].get<std::string>());
[[nodiscard]] std::vector<double> get_extrinsic_params() {
return call<std::vector<double>>("get-extrinsic-params");
}
return response;
}
~RpcClient() {
// Socket closes automatically
}
[[nodiscard]] std::vector<std::vector<double>> get_cloud_point() {
return call<std::vector<std::vector<double>>>("get-cloud-point");
}
private:
asio::io_context io_context_;
asio::ip::tcp::socket socket_;
int id_counter_ = 0;
template <typename ReturnType>
[[nodiscard]] ReturnType call(std::string_view name) {
return this->CallMethod<ReturnType>(id++, name.data());
}
~RpcClient() = default;
private:
int id{0};
};
} // namespace cloud_point_rpc

View File

@ -9,13 +9,13 @@
namespace cloud_point_rpc {
class RpcServer {
public:
using Handler = std::function<nlohmann::json(const nlohmann::json&)>;
public:
using Handler = std::function<nlohmann::json(const nlohmann::json &)>;
void register_method(const std::string& name, Handler handler);
[[nodiscard]] std::string process(const std::string& request_str);
void register_method(const std::string &name, Handler handler);
[[nodiscard]] std::string process(const std::string &request_str);
private:
private:
std::map<std::string, Handler> handlers_;
};

View File

@ -0,0 +1,37 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <string>
#include <type_traits>
#include <vector>
namespace cloud_point_rpc {
template <typename T>
concept NumericType = requires(T param) {
requires std::is_integral_v<T> || std::is_floating_point_v<T>;
requires !std::is_same_v<bool, T>;
requires std::is_arithmetic_v<decltype(param + 1)>;
requires !std::is_pointer_v<T>;
};
template <NumericType T> std::vector<uint8_t> serialize(const T &data) {
std::vector<uint8_t> buffer;
buffer.resize(sizeof(data));
std::memcpy(buffer.data(), &data, sizeof(data));
return buffer;
}
inline void inplace_size_embedding(std::string &str) {
auto size = str.size();
auto tmp = serialize(size);
str.insert(str.begin(), tmp.begin(), tmp.end());
}
template <NumericType T> T deserialize(const std::vector<uint8_t> &buffer) {
return *reinterpret_cast<const T *>(buffer.data());
}
} // namespace cloud_point_rpc

View File

@ -1,20 +1,20 @@
#pragma once
#include <vector>
#include "cloud_point_rpc/config.hpp"
#include <vector>
namespace cloud_point_rpc {
class Service {
public:
explicit Service(const TestData& data = {});
public:
explicit Service(const TestData &data = {});
[[nodiscard]] std::vector<double> get_intrinsic_params() const;
[[nodiscard]] std::vector<double> get_extrinsic_params() const;
[[nodiscard]] std::vector<std::vector<double>> get_cloud_point() const;
[[nodiscard]] std::vector<double> get_intrinsic_params() const;
[[nodiscard]] std::vector<double> get_extrinsic_params() const;
[[nodiscard]] std::vector<std::vector<double>> get_cloud_point() const;
private:
TestData data_;
private:
TestData data_;
};
} // namespace cloud_point_rpc

View File

@ -0,0 +1,80 @@
#pragma once
#include "cloud_point_rpc/serialize.hpp"
#include "jsonrpccxx/iclientconnector.hpp"
#include <asio.hpp>
#include <glog/logging.h>
#include <string>
namespace cloud_point_rpc {
/**
* TCPConnector main purpose is to implement jsonrpccxx::IClientConnector Send
* method As an internal implementation, TCPConnector adds to the beginning of
* the json "expected size" of a packet in case when the packet was for some
* reason on any level fractured.
*/
class TCPConnector : public jsonrpccxx::IClientConnector {
public:
TCPConnector(const std::string &ip, size_t port) noexcept(false)
: io_context_(), socket_(io_context_) {
try {
LOG(INFO) << "Client connecting to " << ip << ":" << port;
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port);
socket_.connect(endpoint);
LOG(INFO) << "Client connected";
} catch (const std::exception &e) {
throw std::runtime_error(std::string("Connection Failed: ") +
e.what());
}
}
std::string Send(const std::string &request) override {
// Send
LOG(INFO) << "Client sending: " << request
<< " Size: " << request.size();
auto remove_const = request;
inplace_size_embedding(remove_const);
asio::write(socket_, asio::buffer(remove_const));
return read();
}
protected:
std::string read() {
std::string result;
std::array<char, 8> header;
LOG(INFO) << "trying to read";
try {
size_t len =
asio::read(socket_, asio::buffer(header, header.size()));
if (len != sizeof(uint64_t)) {
LOG(ERROR) << "failed to read header";
return result;
}
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
uint64_t packet_size = deserialize<uint64_t>(v);
LOG(INFO) << "Received packet of size: " << packet_size;
std::vector<char> payload(packet_size);
len = asio::read(socket_, asio::buffer(payload));
// if failed to fetch in one pack, try to read until size
while (len < packet_size) {
len += asio::read(socket_, asio::buffer(payload.data() + len,
payload.size() - len));
}
LOG(INFO) << std::format("Was able to read len={}", len);
result = std::string(payload.begin(), payload.end());
LOG(INFO) << "Payload: \n" << result;
} catch (const std::exception &e) {
LOG(WARNING) << "Connector handling error: " << e.what();
}
return result;
}
private:
asio::io_context io_context_;
asio::ip::tcp::socket socket_;
};
} // namespace cloud_point_rpc

View File

@ -1,129 +1,147 @@
#pragma once
#include <string>
#include "cloud_point_rpc/serialize.hpp"
#include <asio.hpp>
#include <atomic>
#include <functional>
#include <glog/logging.h>
#include <string>
#include <thread>
#include <atomic>
#include <asio.hpp>
namespace cloud_point_rpc {
class TcpServer {
public:
using RequestProcessor = std::function<std::string(const std::string&)>;
public:
using RequestProcessor = std::function<std::string(const std::string &)>;
TcpServer(const std::string& ip, int port, RequestProcessor processor)
: ip_(ip), port_(port), processor_(std::move(processor)),
acceptor_(io_context_), running_(false) {}
TcpServer(const std::string &ip, int port, RequestProcessor processor)
: ip_(ip), port_(port), processor_(std::move(processor)),
acceptor_(io_context_), running_(false) {}
~TcpServer() { stop(); }
~TcpServer() { stop(); }
void start() {
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
void start() {
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
running_ = true;
LOG(INFO) << "Server listening on " << ip_ << ":" << port_;
running_ = true;
LOG(INFO) << "Server listening on " << ip_ << ":" << port_;
accept_thread_ = std::jthread([this]() {
LOG(INFO) << "Accept thread started";
while (running_) {
try {
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.accept(*socket);
LOG(INFO) << "New connection from " << socket->remote_endpoint().address().to_string();
accept_thread_ = std::jthread([this]() {
LOG(INFO) << "Accept thread started";
while (running_) {
try {
auto socket = std::make_shared<asio::ip::tcp::socket>(
io_context_);
acceptor_.accept(*socket);
std::jthread([this, socket]() {
handle_client(socket);
}).detach();
} catch (const std::system_error& e) {
LOG(INFO) << "Accept exception: " << e.what();
if (running_) {
LOG(WARNING) << "Accept failed: " << e.what();
}
}
LOG(INFO)
<< "New connection from "
<< socket->remote_endpoint().address().to_string();
std::jthread([this, socket]() {
handle_client(socket);
}).detach();
} catch (const std::system_error &e) {
LOG(INFO) << "Accept exception: " << e.what();
if (running_) {
LOG(WARNING) << "Accept failed: " << e.what();
}
}
}
LOG(INFO) << "Accept thread exiting";
});
} catch (const std::exception &e) {
LOG(ERROR) << "Server start failed: " << e.what();
throw;
}
LOG(INFO) << "Accept thread exiting";
});
} catch (const std::exception& e) {
LOG(ERROR) << "Server start failed: " << e.what();
throw;
}
}
void stop() {
if (!running_) return;
LOG(INFO) << "Stopping server...";
running_ = false;
// Closing acceptor unblocks accept() call usually, but sometimes we need to prod it
asio::error_code ec;
acceptor_.close(ec);
// Ensure accept unblocks by connecting a dummy socket
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_);
asio::ip::tcp::socket dummy_sock(io_context_);
asio::error_code connect_ec;
dummy_sock.connect(endpoint, connect_ec);
} catch (...) {
// Ignore
void stop() {
if (!running_)
return;
LOG(INFO) << "Stopping server...";
running_ = false;
// Closing acceptor unblocks accept() call usually, but sometimes we
// need to prod it
asio::error_code ec;
std::ignore = acceptor_.close(ec);
if (ec.value()) {
LOG(ERROR) << std::format(
"acceptor closed with a value returned = {}", ec.value());
}
// Ensure accept unblocks by connecting a dummy socket
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_);
asio::ip::tcp::socket dummy_sock(io_context_);
asio::error_code connect_ec;
std::ignore = dummy_sock.connect(endpoint, connect_ec);
} catch (...) {
// Ignore
}
LOG(INFO) << "Acceptor closed";
}
LOG(INFO) << "Acceptor closed";
}
void join() {
if (accept_thread_.joinable()) {
LOG(INFO) << "Joining accept thread...";
accept_thread_.join();
LOG(INFO) << "Accept thread joined";
void join() {
if (accept_thread_.joinable()) {
LOG(INFO) << "Joining accept thread...";
accept_thread_.join();
LOG(INFO) << "Accept thread joined";
}
}
}
private:
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
try {
asio::streambuf buffer;
// Read until newline or EOF/error
// Note: This matches the client implementation which should send a newline
// However, previous implementation read 4096 bytes raw.
// Let's emulate "read some" to match previous simple behavior, or use read_until if we enforce framing.
// Given this is a prototype, let's read once.
LOG(INFO) << "Server reading from client...";
char data[4096];
size_t length = socket->read_some(asio::buffer(data, 4096)); // Error will throw
LOG(INFO) << "Server read " << length << " bytes";
if (length > 0) {
std::string request(data, length);
LOG(INFO) << "Server processing request: " << request;
std::string response = processor_(request);
response += "\n";
LOG(INFO) << "Server sending response: " << response;
asio::write(*socket, asio::buffer(response));
LOG(INFO) << "Server sent response";
}
} catch (const std::exception& e) {
LOG(WARNING) << "Client handling error: " << e.what();
private:
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
try {
LOG(INFO) << "Server reading from client...";
std::array<char, 8> header;
size_t header_length = asio::read(*socket, asio::buffer(header));
if (header_length != 8) {
LOG(WARNING) << "Invalid header length: " << header_length;
return;
}
uint64_t packet_size = *reinterpret_cast<uint64_t *>(header.data());
LOG(INFO) << "Expected packet size: " << packet_size;
std::vector<char> payload(packet_size);
size_t payload_length = asio::read(*socket, asio::buffer(payload));
LOG(INFO) << "Read payload length: " << payload_length;
if (payload_length != packet_size) {
LOG(WARNING) << "Payload length mismatch: expected "
<< packet_size << ", got " << payload_length;
return;
}
if (payload_length > 0) {
std::string request(payload.data(), payload_length);
LOG(INFO) << "Server processing request: " << request;
std::string response = processor_(request);
response += "\n";
LOG(INFO) << "Server sending response: " << response;
inplace_size_embedding(response);
asio::write(*socket, asio::buffer(response));
LOG(INFO) << "Server sent response";
}
} catch (const std::exception &e) {
LOG(WARNING) << "Client handling error: " << e.what();
}
}
// Socket closes when shared_ptr dies
}
std::string ip_;
int port_;
RequestProcessor processor_;
asio::io_context io_context_;
asio::ip::tcp::acceptor acceptor_;
std::atomic<bool> running_;
std::jthread accept_thread_;
std::string ip_;
int port_;
RequestProcessor processor_;
asio::io_context io_context_;
asio::ip::tcp::acceptor acceptor_;
std::atomic<bool> running_;
std::jthread accept_thread_;
};
} // namespace cloud_point_rpc

View File

@ -1,11 +1,11 @@
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include <glog/logging.h>
#include <iomanip>
#include <string>
namespace cloud_point_rpc {
void print_menu(std::ostream& output) {
void print_menu(std::ostream &output) {
output << "\n=== Cloud Point RPC CLI ===" << std::endl;
output << "1. get-intrinsic-params" << std::endl;
output << "2. get-extrinsic-params" << std::endl;
@ -14,19 +14,39 @@ void print_menu(std::ostream& output) {
output << "Select an option: ";
}
int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port) {
template <typename T> std::string vector_to_string(const std::vector<T> &v) {
std::string result;
for (auto &el : v) {
result += std::to_string(el) + " ";
}
return result;
}
template <typename T>
std::string vector_to_string(const std::vector<std::vector<T>> &v) {
std::string result;
for (auto &el : v) {
result += vector_to_string(el) + "\n";
}
return result;
}
int run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int port) {
try {
RpcClient client;
client.connect(ip, port);
TCPConnector connector(ip, port);
RpcClient client(connector);
output << "Connected to " << ip << ":" << port << std::endl;
std::string choice;
while (true) {
print_menu(output);
if (!(input >> choice)) break;
if (!(input >> choice))
break;
if (choice == "0") break;
if (choice == "0")
break;
std::string method;
if (choice == "1") {
@ -41,13 +61,24 @@ int run_cli(std::istream& input, std::ostream& output, const std::string& ip, in
}
try {
auto response = client.call(method);
output << "\nResponse:\n" << response.dump(4) << std::endl;
} catch (const std::exception& e) {
if (method == "get-intrinsic-params") {
auto response = client.get_intrinsic_params();
output << vector_to_string(response);
}
if (method == "get-extrinsic-params") {
auto response = client.get_extrinsic_params();
output << vector_to_string(response);
}
if (method == "get-cloud-point") {
auto response = client.get_cloud_point();
output << vector_to_string(response);
}
} catch (const std::exception &e) {
output << "\nRPC Error: " << e.what() << std::endl;
}
}
} catch (const std::exception& e) {
} catch (const std::exception &e) {
LOG(ERROR) << "CLI Error: " << e.what();
output << "Error: " << e.what() << std::endl;
return 1;

View File

@ -1,12 +1,12 @@
#include <iostream>
#include <fstream>
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/config.hpp"
#include <fstream>
#include <glog/logging.h>
#include <iostream>
int main(int argc, char* argv[]) {
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = 1;
FLAGS_logtostderr = 1;
std::string config_path = "config.yaml";
if (argc > 1) {
@ -17,15 +17,17 @@ int main(int argc, char* argv[]) {
std::ifstream f(config_path.c_str());
if (!f.good()) {
std::cerr << "Config file not found: " << config_path << std::endl;
std::cerr << "Please create config.yaml or provide path as argument." << std::endl;
std::cerr << "Please create config.yaml or provide path as argument."
<< std::endl;
return 1;
}
f.close();
try {
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip, config.server.port);
} catch (const std::exception& e) {
return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip,
config.server.port);
} catch (const std::exception &e) {
std::cerr << "Failed to start CLI: " << e.what() << std::endl;
return 1;
}

View File

@ -1,61 +1,59 @@
#include "cloud_point_rpc/rpc_server.hpp"
#include <iostream>
#include <glog/logging.h>
using json = nlohmann::json;
namespace cloud_point_rpc {
namespace {
json create_error(int code, const std::string& message, const json& id = nullptr) {
return {
{"jsonrpc", "2.0"},
{"error", {{"code", code}, {"message", message}}},
{"id", id}
};
json create_error(int code, const std::string &message,
const json &id = nullptr) {
return {{"jsonrpc", "2.0"},
{"error", {{"code", code}, {"message", message}}},
{"id", id}};
}
json create_success(const json& result, const json& id) {
return {
{"jsonrpc", "2.0"},
{"result", result},
{"id", id}
};
json create_success(const json &result, const json &id) {
return {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}};
}
} // namespace
void RpcServer::register_method(const std::string& name, Handler handler) {
handlers_[name] = std::move(handler);
void RpcServer::register_method(const std::string &name, Handler handler) {
handlers_[name] = std::move(handler);
}
std::string RpcServer::process(const std::string& request_str) {
json request;
try {
request = json::parse(request_str);
} catch (const json::parse_error&) {
return create_error(-32700, "Parse error").dump();
}
std::string RpcServer::process(const std::string &request_str) {
json request;
try {
request = json::parse(request_str);
} catch (const json::parse_error &) {
LOG(ERROR) << "json parse error" << __func__;
return create_error(-32700, "Parse error").dump();
}
// Batch requests are not supported in this minimal version, assume single object
if (!request.contains("jsonrpc") || request["jsonrpc"] != "2.0" ||
!request.contains("method") || !request.contains("id")) {
return create_error(-32600, "Invalid Request", request.value("id", json(nullptr))).dump();
}
// Batch requests are not supported in this minimal version, assume single
// object
if (!request.contains("jsonrpc") || request["jsonrpc"] != "2.0" ||
!request.contains("method") || !request.contains("id")) {
return create_error(-32600, "Invalid Request",
request.value("id", json(nullptr)))
.dump();
}
std::string method = request["method"];
json id = request["id"];
json params = request.value("params", json::object());
std::string method = request["method"];
json id = request["id"];
json params = request.value("params", json::object());
auto it = handlers_.find(method);
if (it == handlers_.end()) {
return create_error(-32601, "Method not found", id).dump();
}
auto it = handlers_.find(method);
if (it == handlers_.end()) {
return create_error(-32601, "Method not found", id).dump();
}
try {
json result = it->second(params);
return create_success(result, id).dump();
} catch (const std::exception& e) {
return create_error(-32000, e.what(), id).dump(); // Server error
}
try {
json result = it->second(params);
return create_success(result, id).dump();
} catch (const std::exception &e) {
return create_error(-32000, e.what(), id).dump(); // Server error
}
}
} // namespace cloud_point_rpc

View File

@ -1,58 +1,57 @@
#include <iostream>
#include <string>
#include <glog/logging.h>
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include <glog/logging.h>
#include <string>
using json = nlohmann::json;
int main(int argc, char* argv[]) {
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
FLAGS_alsologtostderr = 1;
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
FLAGS_alsologtostderr = 1;
std::string config_path = "config.yaml";
if (argc > 1) {
config_path = argv[1];
}
std::string config_path = "config.yaml";
if (argc > 1) {
config_path = argv[1];
}
LOG(INFO) << "Starting Cloud Point RPC Server (Test Mock)...";
LOG(INFO) << "Starting Cloud Point RPC Server (Test Mock)...";
try {
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
LOG(INFO) << "Loaded config from " << config_path;
try {
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
LOG(INFO) << "Loaded config from " << config_path;
// Inject test data into service
cloud_point_rpc::Service service(config.test_data);
cloud_point_rpc::RpcServer rpc_server;
// Inject test data into service
cloud_point_rpc::Service service(config.test_data);
cloud_point_rpc::RpcServer rpc_server;
rpc_server.register_method("get-intrinsic-params", [&](const json&) {
return service.get_intrinsic_params();
});
rpc_server.register_method("get-intrinsic-params", [&](const json &) {
return service.get_intrinsic_params();
});
rpc_server.register_method("get-extrinsic-params", [&](const json&) {
return service.get_extrinsic_params();
});
rpc_server.register_method("get-extrinsic-params", [&](const json &) {
return service.get_extrinsic_params();
});
rpc_server.register_method("get-cloud-point", [&](const json&) {
return service.get_cloud_point();
});
rpc_server.register_method("get-cloud-point", [&](const json &) {
return service.get_cloud_point();
});
cloud_point_rpc::TcpServer server(config.server.ip, config.server.port,
[&](const std::string& request) {
return rpc_server.process(request);
}
);
cloud_point_rpc::TcpServer server(config.server.ip, config.server.port,
[&](const std::string &request) {
return rpc_server.process(
request);
});
server.start();
server.join();
server.start();
server.join();
} catch (const std::exception& e) {
LOG(ERROR) << "Fatal error: " << e.what();
return 1;
}
} catch (const std::exception &e) {
LOG(ERROR) << "Fatal error: " << e.what();
return 1;
}
return 0;
return 0;
}

View File

@ -2,37 +2,29 @@
namespace cloud_point_rpc {
Service::Service(const TestData& data) : data_(data) {}
Service::Service(const TestData &data) : data_(data) {}
std::vector<double> Service::get_intrinsic_params() const {
if (data_.intrinsic_params.empty()) {
// Fallback if no config loaded
return {1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0};
}
return data_.intrinsic_params;
if (data_.intrinsic_params.empty()) {
// Fallback if no config loaded
return {1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0};
}
return data_.intrinsic_params;
}
std::vector<double> Service::get_extrinsic_params() const {
if (data_.extrinsic_params.empty()) {
return {1.0, 0.0, 0.0, 0.0,
0.0, 1.0, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0,
0.0, 0.0, 0.0, 1.0};
}
return data_.extrinsic_params;
if (data_.extrinsic_params.empty()) {
return {1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0};
}
return data_.extrinsic_params;
}
std::vector<std::vector<double>> Service::get_cloud_point() const {
if (data_.cloud_point.empty()) {
return {
{0.1, 0.2, 0.3},
{1.1, 1.2, 1.3},
{2.1, 2.2, 2.3}
};
}
return data_.cloud_point;
if (data_.cloud_point.empty()) {
return {{0.1, 0.2, 0.3}, {1.1, 1.2, 1.3}, {2.1, 2.2, 2.3}};
}
return data_.cloud_point;
}
} // namespace cloud_point_rpc

View File

@ -1,7 +1,8 @@
test_sources = files(
'test_rpc.cpp',
'test_integration.cpp',
'test_cli.cpp'
'test_cli.cpp',
'test_tcp.cpp'
)
test_exe = executable('unit_tests',

View File

@ -1,39 +1,36 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <thread>
#include <chrono>
#include <sstream>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <iostream>
#include <future>
#include <sstream>
#include <thread>
#include "cloud_point_rpc/tcp_server.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
using namespace cloud_point_rpc;
class CliTest : public ::testing::Test {
protected:
protected:
void SetUp() override {
server_ip = "127.0.0.1";
server_ip = "127.0.0.1";
server_port = 9096;
rpc_server = std::make_unique<RpcServer>();
rpc_server->register_method("hello", [](const nlohmann::json& params) {
return "world";
});
rpc_server = std::make_unique<RpcServer>();
rpc_server->register_method(
"hello", [](const nlohmann::json &) { return "world"; });
tcp_server = std::make_unique<TcpServer>(server_ip, server_port, [this](const std::string& req) {
return rpc_server->process(req);
});
tcp_server = std::make_unique<TcpServer>(
server_ip, server_port, [this](const std::string &req) {
return rpc_server->process(req);
});
tcp_server->start();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void TearDown() override {
tcp_server->stop();
}
void TearDown() override { tcp_server->stop(); }
std::string server_ip;
int server_port;
@ -48,9 +45,10 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
// Select option 1 (get-intrinsic-params) then 0 (exit)
// First we need to make sure the rpc_server has the method registered.
// Our SetUp registers "hello", let's register "get-intrinsic-params" too.
rpc_server->register_method("get-intrinsic-params", [](const nlohmann::json&) {
return std::vector<double>{1.0, 2.0, 3.0};
});
rpc_server->register_method(
"get-intrinsic-params", [](const nlohmann::json &) {
return std::vector<double>{14589.0, 22489.0, 3123124.555};
});
input << "1" << std::endl;
input << "0" << std::endl;
@ -60,8 +58,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
EXPECT_EQ(result, 0);
std::string response = output.str();
// Use more flexible check because of pretty printing
EXPECT_THAT(response, ::testing::HasSubstr("\"result\": ["));
EXPECT_THAT(response, ::testing::HasSubstr("1.0"));
EXPECT_THAT(response, ::testing::HasSubstr("2.0"));
EXPECT_THAT(response, ::testing::HasSubstr("3.0"));
EXPECT_THAT(response, ::testing::HasSubstr("14589.0"));
EXPECT_THAT(response, ::testing::HasSubstr("22489.0"));
EXPECT_THAT(response, ::testing::HasSubstr("3123124.555"));
}

View File

@ -1,100 +1,97 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <thread>
#include <chrono>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <thread>
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include <fstream>
using namespace cloud_point_rpc;
class IntegrationTest : public ::testing::Test {
protected:
void SetUp() override {
// Create a temporary config file for testing
std::ofstream config_file("config.yaml");
config_file << "server:\n"
<< " ip: \"127.0.0.1\"\n"
<< " port: 9095\n"
<< "test_data:\n"
<< " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]\n"
<< " extrinsic_params: [1,0,0,0, 0,1,0,0, 0,0,1,0, 0,0,0,1]\n"
<< " cloud_point:\n"
<< " - [0.1, 0.2, 0.3]\n";
config_file.close();
protected:
void SetUp() override {
// Create a temporary config file for testing
std::ofstream config_file("config.yaml");
config_file
<< "server:\n"
<< " ip: \"127.0.0.1\"\n"
<< " port: 9095\n"
<< "test_data:\n"
<< " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, "
"9.0]\n"
<< " extrinsic_params: [1,0,0,0, 0,1,0,0, 0,0,1,0, 0,0,0,1]\n"
<< " cloud_point:\n"
<< " - [0.1, 0.2, 0.3]\n";
config_file.close();
// Setup Mock Server
try {
config_ = ConfigLoader::load("config.yaml");
} catch (...) {
// If config fails, we can't proceed, but we should avoid crashing in TearDown
throw;
}
service_ = std::make_unique<Service>(config_.test_data);
rpc_server_ = std::make_unique<RpcServer>();
rpc_server_->register_method("get-intrinsic-params", [&](const nlohmann::json&) {
return service_->get_intrinsic_params();
});
// Start Server Thread
tcp_server_ = std::make_unique<TcpServer>(config_.server.ip, config_.server.port,
[&](const std::string& req) {
return rpc_server_->process(req);
// Setup Mock Server
try {
config_ = ConfigLoader::load("config.yaml");
} catch (...) {
// If config fails, we can't proceed, but we should avoid crashing
// in TearDown
throw;
}
);
server_thread_ = std::thread([&]() {
tcp_server_->start();
});
// Give server time to start
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
service_ = std::make_unique<Service>(config_.test_data);
rpc_server_ = std::make_unique<RpcServer>();
void TearDown() override {
if (tcp_server_) {
tcp_server_->stop();
rpc_server_->register_method(
"get-intrinsic-params", [&](const nlohmann::json &) {
return service_->get_intrinsic_params();
});
// Start Server Thread
tcp_server_ = std::make_unique<TcpServer>(
config_.server.ip, config_.server.port,
[&](const std::string &req) { return rpc_server_->process(req); });
server_thread_ = std::thread([&]() { tcp_server_->start(); });
// Give server time to start
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
if (server_thread_.joinable()) {
server_thread_.join();
}
// Clean up
std::remove("config.yaml");
}
Config config_;
std::unique_ptr<Service> service_;
std::unique_ptr<RpcServer> rpc_server_;
std::unique_ptr<TcpServer> tcp_server_;
std::thread server_thread_;
void TearDown() override {
if (tcp_server_) {
tcp_server_->stop();
}
if (server_thread_.joinable()) {
server_thread_.join();
}
// Clean up
std::remove("config.yaml");
}
Config config_;
std::unique_ptr<Service> service_;
std::unique_ptr<RpcServer> rpc_server_;
std::unique_ptr<TcpServer> tcp_server_;
std::thread server_thread_;
};
TEST_F(IntegrationTest, ClientCanConnectAndRetrieveValues) {
RpcClient client;
// Act: Connect
ASSERT_NO_THROW(client.connect(config_.server.ip, config_.server.port));
TCPConnector connector(config_.server.ip, config_.server.port);
RpcClient client(connector);
// Act: Call Method
auto params = client.get_intrinsic_params();
// Act: Call Method
std::vector<double> params;
EXPECT_NO_THROW(params = client.get_intrinsic_params());
// Assert: Values match config
const auto& expected = config_.test_data.intrinsic_params;
ASSERT_EQ(params.size(), expected.size());
for (size_t i = 0; i < params.size(); ++i) {
EXPECT_DOUBLE_EQ(params[i], expected[i]);
}
// Assert: Values match config
const auto &expected = config_.test_data.intrinsic_params;
ASSERT_EQ(params.size(), expected.size());
for (size_t i = 0; i < params.size(); ++i) {
EXPECT_DOUBLE_EQ(params[i], expected[i]);
}
}
TEST_F(IntegrationTest, ClientHandlesConnectionError) {
RpcClient client;
// Try connecting to a closed port
EXPECT_THROW(client.connect("127.0.0.1", 9999), std::runtime_error);
EXPECT_THROW(TCPConnector connector("127.0.0.1", 9999), std::runtime_error);
}

View File

@ -1,61 +1,63 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <string>
#include <vector>
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
using json = nlohmann::json;
using namespace cloud_point_rpc;
class RpcServerTest : public ::testing::Test {
protected:
RpcServer server;
Service service;
protected:
RpcServer server;
Service service;
void SetUp() override {
server.register_method("get-intrinsic-params", [&](const json&) {
return service.get_intrinsic_params();
});
}
void SetUp() override {
server.register_method("get-intrinsic-params", [&](const json &) {
return service.get_intrinsic_params();
});
}
};
TEST_F(RpcServerTest, GetIntrinsicParamsReturnsMatrix) {
std::string request = R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_EQ(response["jsonrpc"], "2.0");
ASSERT_EQ(response["id"], 1);
ASSERT_TRUE(response.contains("result"));
auto result = response["result"].get<std::vector<double>>();
EXPECT_EQ(result.size(), 9);
// Verify Identity Matrix
EXPECT_EQ(result[0], 1.0);
EXPECT_EQ(result[4], 1.0);
EXPECT_EQ(result[8], 1.0);
std::string request =
R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_EQ(response["jsonrpc"], "2.0");
ASSERT_EQ(response["id"], 1);
ASSERT_TRUE(response.contains("result"));
auto result = response["result"].get<std::vector<double>>();
EXPECT_EQ(result.size(), 9);
// Verify Identity Matrix
EXPECT_EQ(result[0], 1.0);
EXPECT_EQ(result[4], 1.0);
EXPECT_EQ(result[8], 1.0);
}
TEST_F(RpcServerTest, MethodNotFoundReturnsError) {
std::string request = R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_TRUE(response.contains("error"));
EXPECT_EQ(response["error"]["code"], -32601);
EXPECT_EQ(response["error"]["message"], "Method not found");
std::string request =
R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_TRUE(response.contains("error"));
EXPECT_EQ(response["error"]["code"], -32601);
EXPECT_EQ(response["error"]["message"], "Method not found");
}
TEST_F(RpcServerTest, InvalidJsonReturnsParseError) {
std::string request = R"({"jsonrpc": "2.0", "method": "broken-json...)";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_TRUE(response.contains("error"));
EXPECT_EQ(response["error"]["code"], -32700);
std::string request = R"({"jsonrpc": "2.0", "method": "broken-json...)";
std::string response_str = server.process(request);
json response = json::parse(response_str);
ASSERT_TRUE(response.contains("error"));
EXPECT_EQ(response["error"]["code"], -32700);
}

58
tests/test_tcp.cpp Normal file
View File

@ -0,0 +1,58 @@
#include "cloud_point_rpc/serialize.hpp"
#include "cloud_point_rpc/tcp_connector.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include <asio.hpp>
#include <cstdint>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
class TcpTest : public ::testing::Test {
public:
void ExpectedResponse(const std::string &expected_response) {
expected_ = expected_response;
}
protected:
void SetUp() override {
server_ = std::make_unique<cloud_point_rpc::TcpServer>(
"127.0.0.1", 12345, [this](const std::string &request) {
EXPECT_EQ(request, expected_);
std::string msg = "Echo: " + request;
auto v = cloud_point_rpc::serialize(msg.length());
std::string res(v.begin(), v.end());
res += msg;
return res;
});
server_->start();
}
void TearDown() override {
server_->stop();
server_.reset();
}
std::string expected_;
std::unique_ptr<cloud_point_rpc::TcpServer> server_;
};
TEST(SerializeTest, Base) {
uint64_t value{123};
auto res = cloud_point_rpc::serialize(value);
EXPECT_EQ(value, cloud_point_rpc::deserialize<uint64_t>(res));
}
TEST_F(TcpTest, EchoTest) {
constexpr std::string_view msg = "Hello, TCP Server!";
ExpectedResponse(msg.data());
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(msg.data());
}
TEST_F(TcpTest, HugeBuffer) {
static constexpr uint64_t w = 1920, h = 1080, c = 3;
std::string data(w * h * c, 77);
ExpectedResponse(data);
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(data);
}