[jsonrpccxx] moved to external rpc impl

Detailed:
      As base used jsonrpccxx implementation paired with TCP socket
      TCP socket updated to handle dynamic sized buffers
      TCP communication protocol changed to serialized packet size after
      which json string is presented
This commit is contained in:
amukhamadiev 2026-02-20 17:51:14 +03:00
parent 4ddaea91a7
commit ece26e7b1f
21 changed files with 746 additions and 502 deletions

7
.clang-format Normal file
View File

@ -0,0 +1,7 @@
BasedOnStyle: LLVM
IndentWidth: 4
AlignConsecutiveAssignments:
Enabled: true
AcrossEmptyLines: true
AcrossComments: false

View File

@ -6,6 +6,77 @@ Communication JSON RPC protocol and implementation with Unity Scene.
See [API.md](API.md) for detailed request/response formats.
## Comm model
```plantuml
@startuml
box ClientProcess #LightBlue
Participant Caller
Participant CloudPointClient
Participant TCPClient
end box
box UnityProcess #LightGreen
Participant TCPServer
Participant CloudPointServer
Participant UnityWorld
end box
UnityWorld -> CloudPointServer : init thread
activate CloudPointServer
CloudPointServer -> TCPServer : await for connection
activate TCPServer
->CloudPointClient : init thread
activate CloudPointClient
CloudPointClient -> TCPClient : createConnection
TCPClient -> TCPServer : establish connection
TCPServer -> CloudPointServer : established
deactivate TCPServer
CloudPointServer -> TCPServer : await for calls
TCPServer -> TCPServer : await for packet
Caller -> CloudPointClient : I want something
activate CloudPointClient
CloudPointClient -> CloudPointClient : CallMethod<Something>
CloudPointClient -> TCPClient : send(message)
activate TCPClient
TCPClient -> TCPServer : packet send
TCPServer -> TCPServer : await for packet
activate TCPServer
TCPServer -> TCPServer : read packet
TCPServer -> TCPClient : packet read
TCPClient -> CloudPointClient : done
deactivate TCPClient
CloudPointClient -> TCPClient : await for response
activate TCPClient
TCPClient -> TCPClient : await for packet
TCPServer -> CloudPointServer : callMethod
activate CloudPointServer
CloudPointServer -> UnityWorld : addToStaticQueue
UnityWorld -> UnityWorld : read from queue
activate UnityWorld
UnityWorld -> UnityWorld : callMethod
UnityWorld -> CloudPointServer: set task return value
deactivate UnityWorld
CloudPointServer -> TCPServer : return task
deactivate CloudPointServer
TCPServer -> TCPClient : send response
TCPClient -> TCPServer : response read
TCPClient -> CloudPointClient : response received
TCPServer -> CloudPointServer : done
deactivate TCPServer
CloudPointClient -> Caller : here what you wanted
deactivate CloudPointClient
Caller -> CloudPointClient : destruct
CloudPointClient -> TCPClient : finish waiting
deactivate TCPClient
deactivate CloudPointClient
UnityWorld -> CloudPointServer : destruct
deactivate CloudPointServer
@enduml
```
## Development
The project uses **Meson** build system and **C++20**.

View File

@ -1,11 +0,0 @@
server:
ip: "127.0.0.1"
port: 8085
test_data:
intrinsic_params: [1.1, 0.0, 0.0, 0.0, 1.1, 0.0, 0.0, 0.0, 1.0]
extrinsic_params: [1.0, 0.0, 0.0, 0.5, 0.0, 1.0, 0.0, 0.5, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0]
cloud_point:
- [0.1, 0.2, 0.3]
- [1.1, 1.2, 1.3]
- [5.5, 6.6, 7.7]

View File

@ -14,6 +14,7 @@ namespace cloud_point_rpc {
* @param port Server Port
* @return int exit code
*/
int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port);
int run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int port);
} // namespace cloud_point_rpc

View File

@ -1,10 +1,10 @@
#pragma once
#include <glog/logging.h>
#include <stdexcept>
#include <string>
#include <vector>
#include <yaml-cpp/yaml.h>
#include <glog/logging.h>
#include <stdexcept>
namespace cloud_point_rpc {
@ -33,7 +33,8 @@ class ConfigLoader {
// Server
if (config["server"]) {
c.server.ip = config["server"]["ip"].as<std::string>("127.0.0.1");
c.server.ip =
config["server"]["ip"].as<std::string>("127.0.0.1");
c.server.port = config["server"]["port"].as<int>(8080);
} else {
c.server.ip = "127.0.0.1";
@ -43,13 +44,18 @@ class ConfigLoader {
// Test Data
if (config["test_data"]) {
c.test_data.intrinsic_params = config["test_data"]["intrinsic_params"].as<std::vector<double>>();
c.test_data.extrinsic_params = config["test_data"]["extrinsic_params"].as<std::vector<double>>();
c.test_data.intrinsic_params =
config["test_data"]["intrinsic_params"]
.as<std::vector<double>>();
c.test_data.extrinsic_params =
config["test_data"]["extrinsic_params"]
.as<std::vector<double>>();
// Parse cloud_point (list of lists)
auto cp_node = config["test_data"]["cloud_point"];
for (const auto &point_node : cp_node) {
c.test_data.cloud_point.push_back(point_node.as<std::vector<double>>());
c.test_data.cloud_point.push_back(
point_node.as<std::vector<double>>());
}
} else {
LOG(WARNING) << "No 'test_data' section, using empty/defaults.";

View File

@ -1,82 +1,40 @@
#pragma once
#include <glog/logging.h>
#include <string>
#include <vector>
#include <nlohmann/json.hpp>
#include "tcp_connector.hpp"
#include <asio.hpp>
#include <stdexcept>
#include <glog/logging.h>
#include <jsonrpccxx/client.hpp>
#include <nlohmann/json.hpp>
#include <vector>
namespace cloud_point_rpc {
class RpcClient {
class RpcClient : public jsonrpccxx::JsonRpcClient {
public:
RpcClient() : socket_(io_context_) {}
void connect(const std::string& ip, int port) {
try {
LOG(INFO) << "Client connecting to " << ip << ":" << port;
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port);
socket_.connect(endpoint);
LOG(INFO) << "Client connected";
} catch (const std::exception& e) {
throw std::runtime_error(std::string("Connection Failed: ") + e.what());
}
}
RpcClient(TCPConnector &connector)
: jsonrpccxx::JsonRpcClient(connector, jsonrpccxx::version::v2) {}
[[nodiscard]] std::vector<double> get_intrinsic_params() {
return call("get-intrinsic-params")["result"].get<std::vector<double>>();
return call<std::vector<double>>("get-intrinsic-params");
}
[[nodiscard]] std::vector<double> get_extrinsic_params() {
return call("get-extrinsic-params")["result"].get<std::vector<double>>();
return call<std::vector<double>>("get-extrinsic-params");
}
[[nodiscard]] std::vector<std::vector<double>> get_cloud_point() {
return call("get-cloud-point")["result"].get<std::vector<std::vector<double>>>();
return call<std::vector<std::vector<double>>>("get-cloud-point");
}
[[nodiscard]] nlohmann::json call(const std::string& method, const nlohmann::json& params = nlohmann::json::object()) {
using json = nlohmann::json;
// Create Request
json request = {
{"jsonrpc", "2.0"},
{"method", method},
{"params", params},
{"id", ++id_counter_}
};
std::string request_str = request.dump();
// Send
LOG(INFO) << "Client sending: " << request_str;
asio::write(socket_, asio::buffer(request_str));
// Read Response
LOG(INFO) << "Client reading response...";
std::vector<char> buffer(65536);
asio::error_code ec;
size_t len = socket_.read_some(asio::buffer(buffer), ec);
if (ec) throw std::system_error(ec);
LOG(INFO) << "Client read " << len << " bytes";
json response = json::parse(std::string(buffer.data(), len));
if (response.contains("error")) {
throw std::runtime_error(response["error"]["message"].get<std::string>());
template <typename ReturnType>
[[nodiscard]] ReturnType call(std::string_view name) {
return this->CallMethod<ReturnType>(id++, name.data());
}
return response;
}
~RpcClient() {
// Socket closes automatically
}
~RpcClient() = default;
private:
asio::io_context io_context_;
asio::ip::tcp::socket socket_;
int id_counter_ = 0;
int id{0};
};
} // namespace cloud_point_rpc

View File

@ -0,0 +1,37 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <string>
#include <type_traits>
#include <vector>
namespace cloud_point_rpc {
template <typename T>
concept NumericType = requires(T param) {
requires std::is_integral_v<T> || std::is_floating_point_v<T>;
requires !std::is_same_v<bool, T>;
requires std::is_arithmetic_v<decltype(param + 1)>;
requires !std::is_pointer_v<T>;
};
template <NumericType T> std::vector<uint8_t> serialize(const T &data) {
std::vector<uint8_t> buffer;
buffer.resize(sizeof(data));
std::memcpy(buffer.data(), &data, sizeof(data));
return buffer;
}
inline void inplace_size_embedding(std::string &str) {
auto size = str.size();
auto tmp = serialize(size);
str.insert(str.begin(), tmp.begin(), tmp.end());
}
template <NumericType T> T deserialize(const std::vector<uint8_t> &buffer) {
return *reinterpret_cast<const T *>(buffer.data());
}
} // namespace cloud_point_rpc

View File

@ -1,7 +1,7 @@
#pragma once
#include <vector>
#include "cloud_point_rpc/config.hpp"
#include <vector>
namespace cloud_point_rpc {

View File

@ -0,0 +1,80 @@
#pragma once
#include "cloud_point_rpc/serialize.hpp"
#include "jsonrpccxx/iclientconnector.hpp"
#include <asio.hpp>
#include <glog/logging.h>
#include <string>
namespace cloud_point_rpc {
/**
* TCPConnector main purpose is to implement jsonrpccxx::IClientConnector Send
* method As an internal implementation, TCPConnector adds to the beginning of
* the json "expected size" of a packet in case when the packet was for some
* reason on any level fractured.
*/
class TCPConnector : public jsonrpccxx::IClientConnector {
public:
TCPConnector(const std::string &ip, size_t port) noexcept(false)
: io_context_(), socket_(io_context_) {
try {
LOG(INFO) << "Client connecting to " << ip << ":" << port;
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip), port);
socket_.connect(endpoint);
LOG(INFO) << "Client connected";
} catch (const std::exception &e) {
throw std::runtime_error(std::string("Connection Failed: ") +
e.what());
}
}
std::string Send(const std::string &request) override {
// Send
LOG(INFO) << "Client sending: " << request
<< " Size: " << request.size();
auto remove_const = request;
inplace_size_embedding(remove_const);
asio::write(socket_, asio::buffer(remove_const));
return read();
}
protected:
std::string read() {
std::string result;
std::array<char, 8> header;
LOG(INFO) << "trying to read";
try {
size_t len =
asio::read(socket_, asio::buffer(header, header.size()));
if (len != sizeof(uint64_t)) {
LOG(ERROR) << "failed to read header";
return result;
}
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
uint64_t packet_size = deserialize<uint64_t>(v);
LOG(INFO) << "Received packet of size: " << packet_size;
std::vector<char> payload(packet_size);
len = asio::read(socket_, asio::buffer(payload));
// if failed to fetch in one pack, try to read until size
while (len < packet_size) {
len += asio::read(socket_, asio::buffer(payload.data() + len,
payload.size() - len));
}
LOG(INFO) << std::format("Was able to read len={}", len);
result = std::string(payload.begin(), payload.end());
LOG(INFO) << "Payload: \n" << result;
} catch (const std::exception &e) {
LOG(WARNING) << "Connector handling error: " << e.what();
}
return result;
}
private:
asio::io_context io_context_;
asio::ip::tcp::socket socket_;
};
} // namespace cloud_point_rpc

View File

@ -1,11 +1,12 @@
#pragma once
#include <string>
#include "cloud_point_rpc/serialize.hpp"
#include <asio.hpp>
#include <atomic>
#include <functional>
#include <glog/logging.h>
#include <string>
#include <thread>
#include <atomic>
#include <asio.hpp>
namespace cloud_point_rpc {
@ -21,7 +22,8 @@ class TcpServer {
void start() {
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_);
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
@ -34,10 +36,13 @@ class TcpServer {
LOG(INFO) << "Accept thread started";
while (running_) {
try {
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
auto socket = std::make_shared<asio::ip::tcp::socket>(
io_context_);
acceptor_.accept(*socket);
LOG(INFO) << "New connection from " << socket->remote_endpoint().address().to_string();
LOG(INFO)
<< "New connection from "
<< socket->remote_endpoint().address().to_string();
std::jthread([this, socket]() {
handle_client(socket);
@ -58,19 +63,25 @@ class TcpServer {
}
void stop() {
if (!running_) return;
if (!running_)
return;
LOG(INFO) << "Stopping server...";
running_ = false;
// Closing acceptor unblocks accept() call usually, but sometimes we need to prod it
// Closing acceptor unblocks accept() call usually, but sometimes we
// need to prod it
asio::error_code ec;
acceptor_.close(ec);
std::ignore = acceptor_.close(ec);
if (ec.value()) {
LOG(ERROR) << std::format(
"acceptor closed with a value returned = {}", ec.value());
}
// Ensure accept unblocks by connecting a dummy socket
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_), port_);
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_);
asio::ip::tcp::socket dummy_sock(io_context_);
asio::error_code connect_ec;
dummy_sock.connect(endpoint, connect_ec);
std::ignore = dummy_sock.connect(endpoint, connect_ec);
} catch (...) {
// Ignore
}
@ -88,31 +99,38 @@ class TcpServer {
private:
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
try {
asio::streambuf buffer;
// Read until newline or EOF/error
// Note: This matches the client implementation which should send a newline
// However, previous implementation read 4096 bytes raw.
// Let's emulate "read some" to match previous simple behavior, or use read_until if we enforce framing.
// Given this is a prototype, let's read once.
LOG(INFO) << "Server reading from client...";
char data[4096];
size_t length = socket->read_some(asio::buffer(data, 4096)); // Error will throw
LOG(INFO) << "Server read " << length << " bytes";
if (length > 0) {
std::string request(data, length);
std::array<char, 8> header;
size_t header_length = asio::read(*socket, asio::buffer(header));
if (header_length != 8) {
LOG(WARNING) << "Invalid header length: " << header_length;
return;
}
uint64_t packet_size = *reinterpret_cast<uint64_t *>(header.data());
LOG(INFO) << "Expected packet size: " << packet_size;
std::vector<char> payload(packet_size);
size_t payload_length = asio::read(*socket, asio::buffer(payload));
LOG(INFO) << "Read payload length: " << payload_length;
if (payload_length != packet_size) {
LOG(WARNING) << "Payload length mismatch: expected "
<< packet_size << ", got " << payload_length;
return;
}
if (payload_length > 0) {
std::string request(payload.data(), payload_length);
LOG(INFO) << "Server processing request: " << request;
std::string response = processor_(request);
response += "\n";
LOG(INFO) << "Server sending response: " << response;
inplace_size_embedding(response);
asio::write(*socket, asio::buffer(response));
LOG(INFO) << "Server sent response";
}
} catch (const std::exception &e) {
LOG(WARNING) << "Client handling error: " << e.what();
}
// Socket closes when shared_ptr dies
}
std::string ip_;

View File

@ -1,7 +1,7 @@
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include <glog/logging.h>
#include <iomanip>
#include <string>
namespace cloud_point_rpc {
@ -14,19 +14,39 @@ void print_menu(std::ostream& output) {
output << "Select an option: ";
}
int run_cli(std::istream& input, std::ostream& output, const std::string& ip, int port) {
template <typename T> std::string vector_to_string(const std::vector<T> &v) {
std::string result;
for (auto &el : v) {
result += std::to_string(el) + " ";
}
return result;
}
template <typename T>
std::string vector_to_string(const std::vector<std::vector<T>> &v) {
std::string result;
for (auto &el : v) {
result += vector_to_string(el) + "\n";
}
return result;
}
int run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int port) {
try {
RpcClient client;
client.connect(ip, port);
TCPConnector connector(ip, port);
RpcClient client(connector);
output << "Connected to " << ip << ":" << port << std::endl;
std::string choice;
while (true) {
print_menu(output);
if (!(input >> choice)) break;
if (!(input >> choice))
break;
if (choice == "0") break;
if (choice == "0")
break;
std::string method;
if (choice == "1") {
@ -41,8 +61,19 @@ int run_cli(std::istream& input, std::ostream& output, const std::string& ip, in
}
try {
auto response = client.call(method);
output << "\nResponse:\n" << response.dump(4) << std::endl;
if (method == "get-intrinsic-params") {
auto response = client.get_intrinsic_params();
output << vector_to_string(response);
}
if (method == "get-extrinsic-params") {
auto response = client.get_extrinsic_params();
output << vector_to_string(response);
}
if (method == "get-cloud-point") {
auto response = client.get_cloud_point();
output << vector_to_string(response);
}
} catch (const std::exception &e) {
output << "\nRPC Error: " << e.what() << std::endl;
}

View File

@ -1,8 +1,8 @@
#include <iostream>
#include <fstream>
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/config.hpp"
#include <fstream>
#include <glog/logging.h>
#include <iostream>
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
@ -17,14 +17,16 @@ int main(int argc, char* argv[]) {
std::ifstream f(config_path.c_str());
if (!f.good()) {
std::cerr << "Config file not found: " << config_path << std::endl;
std::cerr << "Please create config.yaml or provide path as argument." << std::endl;
std::cerr << "Please create config.yaml or provide path as argument."
<< std::endl;
return 1;
}
f.close();
try {
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip, config.server.port);
return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip,
config.server.port);
} catch (const std::exception &e) {
std::cerr << "Failed to start CLI: " << e.what() << std::endl;
return 1;

View File

@ -1,25 +1,19 @@
#include "cloud_point_rpc/rpc_server.hpp"
#include <iostream>
#include <glog/logging.h>
using json = nlohmann::json;
namespace cloud_point_rpc {
namespace {
json create_error(int code, const std::string& message, const json& id = nullptr) {
return {
{"jsonrpc", "2.0"},
json create_error(int code, const std::string &message,
const json &id = nullptr) {
return {{"jsonrpc", "2.0"},
{"error", {{"code", code}, {"message", message}}},
{"id", id}
};
{"id", id}};
}
json create_success(const json &result, const json &id) {
return {
{"jsonrpc", "2.0"},
{"result", result},
{"id", id}
};
return {{"jsonrpc", "2.0"}, {"result", result}, {"id", id}};
}
} // namespace
@ -32,13 +26,17 @@ std::string RpcServer::process(const std::string& request_str) {
try {
request = json::parse(request_str);
} catch (const json::parse_error &) {
LOG(ERROR) << "json parse error" << __func__;
return create_error(-32700, "Parse error").dump();
}
// Batch requests are not supported in this minimal version, assume single object
// Batch requests are not supported in this minimal version, assume single
// object
if (!request.contains("jsonrpc") || request["jsonrpc"] != "2.0" ||
!request.contains("method") || !request.contains("id")) {
return create_error(-32600, "Invalid Request", request.value("id", json(nullptr))).dump();
return create_error(-32600, "Invalid Request",
request.value("id", json(nullptr)))
.dump();
}
std::string method = request["method"];

View File

@ -1,10 +1,9 @@
#include <iostream>
#include <string>
#include <glog/logging.h>
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include <glog/logging.h>
#include <string>
using json = nlohmann::json;
@ -42,9 +41,9 @@ int main(int argc, char* argv[]) {
cloud_point_rpc::TcpServer server(config.server.ip, config.server.port,
[&](const std::string &request) {
return rpc_server.process(request);
}
);
return rpc_server.process(
request);
});
server.start();
server.join();

View File

@ -7,30 +7,22 @@ Service::Service(const TestData& data) : data_(data) {}
std::vector<double> Service::get_intrinsic_params() const {
if (data_.intrinsic_params.empty()) {
// Fallback if no config loaded
return {1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0};
return {1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0};
}
return data_.intrinsic_params;
}
std::vector<double> Service::get_extrinsic_params() const {
if (data_.extrinsic_params.empty()) {
return {1.0, 0.0, 0.0, 0.0,
0.0, 1.0, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0,
0.0, 0.0, 0.0, 1.0};
return {1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0,
0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0};
}
return data_.extrinsic_params;
}
std::vector<std::vector<double>> Service::get_cloud_point() const {
if (data_.cloud_point.empty()) {
return {
{0.1, 0.2, 0.3},
{1.1, 1.2, 1.3},
{2.1, 2.2, 2.3}
};
return {{0.1, 0.2, 0.3}, {1.1, 1.2, 1.3}, {2.1, 2.2, 2.3}};
}
return data_.cloud_point;
}

View File

@ -1,7 +1,8 @@
test_sources = files(
'test_rpc.cpp',
'test_integration.cpp',
'test_cli.cpp'
'test_cli.cpp',
'test_tcp.cpp'
)
test_exe = executable('unit_tests',

View File

@ -1,14 +1,13 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <thread>
#include <chrono>
#include <sstream>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <iostream>
#include <future>
#include <sstream>
#include <thread>
#include "cloud_point_rpc/tcp_server.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/cli.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
using namespace cloud_point_rpc;
@ -19,11 +18,11 @@ protected:
server_port = 9096;
rpc_server = std::make_unique<RpcServer>();
rpc_server->register_method("hello", [](const nlohmann::json& params) {
return "world";
});
rpc_server->register_method(
"hello", [](const nlohmann::json &) { return "world"; });
tcp_server = std::make_unique<TcpServer>(server_ip, server_port, [this](const std::string& req) {
tcp_server = std::make_unique<TcpServer>(
server_ip, server_port, [this](const std::string &req) {
return rpc_server->process(req);
});
@ -31,9 +30,7 @@ protected:
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void TearDown() override {
tcp_server->stop();
}
void TearDown() override { tcp_server->stop(); }
std::string server_ip;
int server_port;
@ -48,8 +45,9 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
// Select option 1 (get-intrinsic-params) then 0 (exit)
// First we need to make sure the rpc_server has the method registered.
// Our SetUp registers "hello", let's register "get-intrinsic-params" too.
rpc_server->register_method("get-intrinsic-params", [](const nlohmann::json&) {
return std::vector<double>{1.0, 2.0, 3.0};
rpc_server->register_method(
"get-intrinsic-params", [](const nlohmann::json &) {
return std::vector<double>{14589.0, 22489.0, 3123124.555};
});
input << "1" << std::endl;
@ -60,8 +58,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
EXPECT_EQ(result, 0);
std::string response = output.str();
// Use more flexible check because of pretty printing
EXPECT_THAT(response, ::testing::HasSubstr("\"result\": ["));
EXPECT_THAT(response, ::testing::HasSubstr("1.0"));
EXPECT_THAT(response, ::testing::HasSubstr("2.0"));
EXPECT_THAT(response, ::testing::HasSubstr("3.0"));
EXPECT_THAT(response, ::testing::HasSubstr("14589.0"));
EXPECT_THAT(response, ::testing::HasSubstr("22489.0"));
EXPECT_THAT(response, ::testing::HasSubstr("3123124.555"));
}

View File

@ -1,13 +1,13 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <thread>
#include <chrono>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <thread>
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include "cloud_point_rpc/rpc_client.hpp"
#include <fstream>
@ -18,11 +18,13 @@ class IntegrationTest : public ::testing::Test {
void SetUp() override {
// Create a temporary config file for testing
std::ofstream config_file("config.yaml");
config_file << "server:\n"
config_file
<< "server:\n"
<< " ip: \"127.0.0.1\"\n"
<< " port: 9095\n"
<< "test_data:\n"
<< " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]\n"
<< " intrinsic_params: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, "
"9.0]\n"
<< " extrinsic_params: [1,0,0,0, 0,1,0,0, 0,0,1,0, 0,0,0,1]\n"
<< " cloud_point:\n"
<< " - [0.1, 0.2, 0.3]\n";
@ -32,27 +34,25 @@ class IntegrationTest : public ::testing::Test {
try {
config_ = ConfigLoader::load("config.yaml");
} catch (...) {
// If config fails, we can't proceed, but we should avoid crashing in TearDown
// If config fails, we can't proceed, but we should avoid crashing
// in TearDown
throw;
}
service_ = std::make_unique<Service>(config_.test_data);
rpc_server_ = std::make_unique<RpcServer>();
rpc_server_->register_method("get-intrinsic-params", [&](const nlohmann::json&) {
rpc_server_->register_method(
"get-intrinsic-params", [&](const nlohmann::json &) {
return service_->get_intrinsic_params();
});
// Start Server Thread
tcp_server_ = std::make_unique<TcpServer>(config_.server.ip, config_.server.port,
[&](const std::string& req) {
return rpc_server_->process(req);
}
);
tcp_server_ = std::make_unique<TcpServer>(
config_.server.ip, config_.server.port,
[&](const std::string &req) { return rpc_server_->process(req); });
server_thread_ = std::thread([&]() {
tcp_server_->start();
});
server_thread_ = std::thread([&]() { tcp_server_->start(); });
// Give server time to start
std::this_thread::sleep_for(std::chrono::milliseconds(200));
@ -77,13 +77,12 @@ class IntegrationTest : public ::testing::Test {
};
TEST_F(IntegrationTest, ClientCanConnectAndRetrieveValues) {
RpcClient client;
// Act: Connect
ASSERT_NO_THROW(client.connect(config_.server.ip, config_.server.port));
TCPConnector connector(config_.server.ip, config_.server.port);
RpcClient client(connector);
// Act: Call Method
auto params = client.get_intrinsic_params();
std::vector<double> params;
EXPECT_NO_THROW(params = client.get_intrinsic_params());
// Assert: Values match config
const auto &expected = config_.test_data.intrinsic_params;
@ -94,7 +93,5 @@ TEST_F(IntegrationTest, ClientCanConnectAndRetrieveValues) {
}
TEST_F(IntegrationTest, ClientHandlesConnectionError) {
RpcClient client;
// Try connecting to a closed port
EXPECT_THROW(client.connect("127.0.0.1", 9999), std::runtime_error);
EXPECT_THROW(TCPConnector connector("127.0.0.1", 9999), std::runtime_error);
}

View File

@ -1,10 +1,10 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <string>
#include <vector>
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/service.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
using json = nlohmann::json;
using namespace cloud_point_rpc;
@ -22,7 +22,8 @@ class RpcServerTest : public ::testing::Test {
};
TEST_F(RpcServerTest, GetIntrinsicParamsReturnsMatrix) {
std::string request = R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})";
std::string request =
R"({"jsonrpc": "2.0", "method": "get-intrinsic-params", "id": 1})";
std::string response_str = server.process(request);
json response = json::parse(response_str);
@ -40,7 +41,8 @@ TEST_F(RpcServerTest, GetIntrinsicParamsReturnsMatrix) {
}
TEST_F(RpcServerTest, MethodNotFoundReturnsError) {
std::string request = R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})";
std::string request =
R"({"jsonrpc": "2.0", "method": "unknown-method", "id": 99})";
std::string response_str = server.process(request);
json response = json::parse(response_str);

58
tests/test_tcp.cpp Normal file
View File

@ -0,0 +1,58 @@
#include "cloud_point_rpc/serialize.hpp"
#include "cloud_point_rpc/tcp_connector.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include <asio.hpp>
#include <cstdint>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
class TcpTest : public ::testing::Test {
public:
void ExpectedResponse(const std::string &expected_response) {
expected_ = expected_response;
}
protected:
void SetUp() override {
server_ = std::make_unique<cloud_point_rpc::TcpServer>(
"127.0.0.1", 12345, [this](const std::string &request) {
EXPECT_EQ(request, expected_);
std::string msg = "Echo: " + request;
auto v = cloud_point_rpc::serialize(msg.length());
std::string res(v.begin(), v.end());
res += msg;
return res;
});
server_->start();
}
void TearDown() override {
server_->stop();
server_.reset();
}
std::string expected_;
std::unique_ptr<cloud_point_rpc::TcpServer> server_;
};
TEST(SerializeTest, Base) {
uint64_t value{123};
auto res = cloud_point_rpc::serialize(value);
EXPECT_EQ(value, cloud_point_rpc::deserialize<uint64_t>(res));
}
TEST_F(TcpTest, EchoTest) {
constexpr std::string_view msg = "Hello, TCP Server!";
ExpectedResponse(msg.data());
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(msg.data());
}
TEST_F(TcpTest, HugeBuffer) {
static constexpr uint64_t w = 1920, h = 1080, c = 3;
std::string data(w * h * c, 77);
ExpectedResponse(data);
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(data);
}