Compare commits

..

1 Commits

Author SHA1 Message Date
Artur Mukhamadiev
886e3f1879 [jsonrpccxx] moved to external rpc impl
Detailed:
  As base used jsonrpccxx implementation paired with TCP socket
  TCP socket updated to handle dynamic sized buffers
  TCP communication protocol changed to serialized packet size after
  which json string is presented
2026-02-20 17:23:30 +03:00
36 changed files with 195 additions and 980 deletions

View File

@ -1,34 +0,0 @@
name: Verification
run-name: ${{ gitea.actor }} runs verification of the project
on:
push:
branches:
- 'master'
jobs:
Is-Buildable:
runs-on: ubuntu-latest
steps:
- name: Ensure That Build system available
run: |
apt update
DEBIAN_FRONTEND=noninteractive apt install -y cmake make ninja-build gcc
- name: Prepare Environment
run: |
python3 -m venv $HOME/py/
source $HOME/py/bin/activate
echo PATH=$PATH >> $GITEA_ENV
pip install meson
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: 'true'
- name: Build project
run: |
cd ${{ gitea.workspace }}
meson setup build
meson compile -C build
- name: Unit Test Results
run: |
meson test -C build
- run: echo "🍏 This job's status is ${{ job.status }}."

4
.gitignore vendored
View File

@ -5,6 +5,4 @@ subprojects/glog/
subprojects/googletest-*
subprojects/nlohmann_json/
subprojects/packagecache/
subprojects/yaml-cpp-0.8.0
subprojects/base64-0.5.2/
.venv/
subprojects/yaml-cpp-0.8.0

116
README.md
View File

@ -2,84 +2,118 @@
Communication JSON RPC protocol and implementation with Unity Scene.
## TODO
- [x] Server implementation with C-API for Unity
- [ ] Client correct implementation with OpenCV
## API Documentation
See [API.md](API.md) for detailed request/response formats.
## Comm model
```plantuml
@startuml
box ClientProcess #LightBlue
Participant Caller
Participant CloudPointClient
Participant TCPClient
end box
box UnityProcess #LightGreen
Participant TCPServer
Participant CloudPointServer
Participant UnityWorld
end box
UnityWorld -> CloudPointServer : init thread
activate CloudPointServer
CloudPointServer -> TCPServer : await for connection
activate TCPServer
->CloudPointClient : init thread
activate CloudPointClient
CloudPointClient -> TCPClient : createConnection
TCPClient -> TCPServer : establish connection
TCPServer -> CloudPointServer : established
deactivate TCPServer
CloudPointServer -> TCPServer : await for calls
TCPServer -> TCPServer : await for packet
Caller -> CloudPointClient : I want something
activate CloudPointClient
CloudPointClient -> CloudPointClient : CallMethod<Something>
CloudPointClient -> TCPClient : send(message)
activate TCPClient
TCPClient -> TCPServer : packet send
TCPServer -> TCPServer : await for packet
activate TCPServer
TCPServer -> TCPServer : read packet
TCPServer -> TCPClient : packet read
TCPClient -> CloudPointClient : done
deactivate TCPClient
CloudPointClient -> TCPClient : await for response
activate TCPClient
TCPClient -> TCPClient : await for packet
TCPServer -> CloudPointServer : callMethod
activate CloudPointServer
CloudPointServer -> UnityWorld : addToStaticQueue
UnityWorld -> UnityWorld : read from queue
activate UnityWorld
UnityWorld -> UnityWorld : callMethod
UnityWorld -> CloudPointServer: set task return value
deactivate UnityWorld
CloudPointServer -> TCPServer : return task
deactivate CloudPointServer
TCPServer -> TCPClient : send response
TCPClient -> TCPServer : response read
TCPClient -> CloudPointClient : response received
TCPServer -> CloudPointServer : done
deactivate TCPServer
CloudPointClient -> Caller : here what you wanted
deactivate CloudPointClient
Caller -> CloudPointClient : destruct
CloudPointClient -> TCPClient : finish waiting
deactivate TCPClient
deactivate CloudPointClient
UnityWorld -> CloudPointServer : destruct
deactivate CloudPointServer
@enduml
```
## Development
The project uses **Meson** build system and **C++20**.
### Dependencies
- Meson, Ninja
- GCC/Clang (C++20 support)
- Git (for subprojects)
### Build & Run
```bash
git submodule init
git submodule update
meson setup build
meson compile -C build
./build/src/cloud_point_rpc_server config.yaml
```
#### Build on windows
It's assumed that you have `GCC` and `make`/`ninja` installed on your system (and available in `PATH`)
```powershell
## FIRST OF ALL!
git submodule init
git submodule update
# Next python:
python3 -m venv .\venv
.\.venv\Scripts\Activate.ps1
pip install meson cmake
meson setup -Ddefault_library=static build
meson compile -C build
# To correctly get dlls:
meson devenv -C build
## .\build\tests\unit_tests < for dummy test
## .\build\src\.. < produced execs and libs
```
### Testing
```bash
meson test -C build -v
```
## Docker
You can build and run the cli using `Docker`.
You can build and run the cli using Docker.
### 1. Build Image
```bash
docker build -t cloud-point-rpc .
```
### 2. Run Container
The cli will try to connect to a **running server** on ip and port defined in config.yml file. (defined in `config.yaml` inside the image).
For simplicity, it's better to use a host network, so you will not have any headache with accessability.
The cli will try to connect to a **running server** on ip and port defined in config.yml file. (defined in `config.yaml` inside the image).
For simplicity, it's better to use a host network, so you will not have any headache with accessability.
> _Server is not configured to run through container, if you need, contact me_
> *Server is not configured to run through container, if you need, contact me*
You also can mount your own `config.yaml` to override the default settings:
```bash
docker run --network=host -it -v $(pwd)/my_config.yaml:/app/config.yaml cloud-point-rpc
```
## Communication model
![Communicatoin model plantuml diagram](docs/cm.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

View File

@ -1,66 +0,0 @@
@startuml comm
box ClientProcess #LightBlue
Participant Caller
Participant CloudPointClient
Participant TCPClient
end box
box UnityProcess #LightGreen
Participant TCPServer
Participant CloudPointServer
Participant UnityWorld
end box
UnityWorld -> CloudPointServer : init thread
activate CloudPointServer
CloudPointServer -> TCPServer : await for connection
activate TCPServer
->CloudPointClient : init thread
activate CloudPointClient
CloudPointClient -> TCPClient : createConnection
TCPClient -> TCPServer : establish connection
TCPServer -> CloudPointServer : established
deactivate TCPServer
CloudPointServer -> TCPServer : await for calls
TCPServer -> TCPServer : await for packet
Caller -> CloudPointClient : I want something
activate CloudPointClient
CloudPointClient -> CloudPointClient : CallMethod<Something>
CloudPointClient -> TCPClient : send(message)
activate TCPClient
TCPClient -> TCPServer : packet send
TCPServer -> TCPServer : await for packet
activate TCPServer
TCPServer -> TCPServer : read packet
TCPServer -> TCPClient : packet read
TCPClient -> CloudPointClient : done
deactivate TCPClient
CloudPointClient -> TCPClient : await for response
activate TCPClient
TCPClient -> TCPClient : await for packet
TCPServer -> CloudPointServer : callMethod
activate CloudPointServer
CloudPointServer -> UnityWorld : addToStaticQueue
UnityWorld -> UnityWorld : read from queue
activate UnityWorld
UnityWorld -> UnityWorld : callMethod
UnityWorld -> CloudPointServer: set task return value
deactivate UnityWorld
CloudPointServer -> TCPServer : return task
deactivate CloudPointServer
TCPServer -> TCPClient : send response
TCPClient -> TCPServer : response read
TCPClient -> CloudPointClient : response received
TCPServer -> CloudPointServer : done
deactivate TCPServer
CloudPointClient -> Caller : here what you wanted
deactivate CloudPointClient
Caller -> CloudPointClient : destruct
CloudPointClient -> TCPClient : finish waiting
deactivate TCPClient
deactivate CloudPointClient
UnityWorld -> CloudPointServer : destruct
deactivate CloudPointServer
@enduml

View File

@ -2,8 +2,8 @@
#include <iostream>
#include <string>
#include "export.h"
namespace score {
namespace cloud_point_rpc {
/**
* @brief Runs the CLI client.
@ -14,7 +14,7 @@ namespace score {
* @param port Server Port
* @return int exit code
*/
int CRPC_EXPORT run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int run_cli(std::istream &input, std::ostream &output, const std::string &ip,
int port);
} // namespace cloud_point_rpc

View File

@ -6,11 +6,11 @@
#include <vector>
#include <yaml-cpp/yaml.h>
namespace score {
namespace cloud_point_rpc {
struct ServerConfig {
std::string ip;
int port{0};
int port;
};
struct TestData {

View File

@ -6,7 +6,8 @@
#include <jsonrpccxx/client.hpp>
#include <nlohmann/json.hpp>
#include <vector>
namespace score {
namespace cloud_point_rpc {
class RpcClient : public jsonrpccxx::JsonRpcClient {
public:

View File

@ -1,27 +0,0 @@
//
// Created by vptyp on 11.03.2026.
//
#pragma once
#include <string>
#include <vector>
namespace score {
class IRPCCoder {
public:
virtual ~IRPCCoder() = default;
virtual std::vector<char> decode(const std::string& encoded) = 0;
virtual std::string encode(const std::vector<char>& data) = 0;
};
class Base64RPCCoder final : public IRPCCoder {
public:
Base64RPCCoder();
~Base64RPCCoder() override;
std::vector<char> decode(const std::string& encoded) override;
std::string encode(const std::vector<char>& data) override;
};
}

View File

@ -5,27 +5,14 @@
#include <map>
#include <nlohmann/json.hpp>
#include <string>
#include "export.h"
extern "C" {
struct rpc_string {
rpc_string(const char* data, uint64_t size) : s(data,size) {}
rpc_string() = default;
namespace cloud_point_rpc {
std::string s;
};
}
namespace score {
class CRPC_EXPORT RpcServer {
class RpcServer {
public:
using Handler = std::function<nlohmann::json(const nlohmann::json &)>;
using callback_t = rpc_string*(*)(rpc_string*);
void register_method(const std::string &name, Handler handler);
void register_method(const std::string &name, callback_t handler);
///@param request_str json rpc 2.0 formatted string
[[nodiscard]] std::string process(const std::string &request_str);
private:

View File

@ -5,7 +5,7 @@
#include <type_traits>
#include <vector>
namespace score {
namespace cloud_point_rpc {
template <typename T>
concept NumericType = requires(T param) {

View File

@ -2,10 +2,10 @@
#include "cloud_point_rpc/config.hpp"
#include <vector>
#include "export.h"
namespace score {
class CRPC_EXPORT Service {
namespace cloud_point_rpc {
class Service {
public:
explicit Service(const TestData &data = {});

View File

@ -2,18 +2,17 @@
#include "cloud_point_rpc/serialize.hpp"
#include "jsonrpccxx/iclientconnector.hpp"
#include <asio.hpp>
#include <cloud_point_rpc/tcp_read.hpp>
#include <glog/logging.h>
#include <string>
#include "export.h"
namespace score {
namespace cloud_point_rpc {
/**
* TCPConnector main purpose is to implement jsonrpccxx::IClientConnector Send
* method As an internal implementation, TCPConnector adds to the beginning of
* the json "expected size" of a packet in case when the packet was for some
* reason on any level fractured.
*/
class CRPC_EXPORT TCPConnector : public jsonrpccxx::IClientConnector {
class TCPConnector : public jsonrpccxx::IClientConnector {
public:
TCPConnector(const std::string &ip, size_t port) noexcept(false)
: io_context_(), socket_(io_context_) {
@ -35,7 +34,42 @@ class CRPC_EXPORT TCPConnector : public jsonrpccxx::IClientConnector {
auto remove_const = request;
inplace_size_embedding(remove_const);
asio::write(socket_, asio::buffer(remove_const));
return tcp_read(socket_, "TCPConnector] ");
return read();
}
protected:
std::string read() {
std::string result;
std::array<char, 8> header;
LOG(INFO) << "trying to read";
try {
size_t len =
asio::read(socket_, asio::buffer(header, header.size()));
if (len != sizeof(uint64_t)) {
LOG(ERROR) << "failed to read header";
return result;
}
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
uint64_t packet_size = deserialize<uint64_t>(v);
LOG(INFO) << "Received packet of size: " << packet_size;
std::vector<char> payload(packet_size);
len = asio::read(socket_, asio::buffer(payload));
// if failed to fetch in one pack, try to read until size
while (len < packet_size) {
len += asio::read(socket_, asio::buffer(payload.data() + len,
payload.size() - len));
}
LOG(INFO) << std::format("Was able to read len={}", len);
result = std::string(payload.begin(), payload.end());
LOG(INFO) << "Payload: \n" << result;
} catch (const std::exception &e) {
LOG(WARNING) << "Connector handling error: " << e.what();
}
return result;
}
private:

View File

@ -1,43 +0,0 @@
#pragma once
#include <array>
#include <asio.hpp>
#include <cloud_point_rpc/serialize.hpp>
#include <glog/logging.h>
namespace score {
static inline std::string tcp_read(asio::ip::tcp::socket &socket,
std::string_view prefix) {
std::string result;
std::array<char, 8> header;
LOG(INFO) << prefix.data() << "trying to read";
try {
size_t len = asio::read(socket, asio::buffer(header, header.size()));
if (len != sizeof(uint64_t)) {
LOG(ERROR) << prefix.data() << "failed to read header";
return result;
}
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
uint64_t packet_size = deserialize<uint64_t>(v);
LOG(INFO) << prefix.data()
<< "Received packet of size: " << packet_size;
std::vector<char> payload(packet_size);
len = asio::read(socket, asio::buffer(payload));
// if failed to fetch in one pack, try to read until size
while (len < packet_size) {
len += asio::read(socket, asio::buffer(payload.data() + len,
payload.size() - len));
}
LOG(INFO) << prefix.data()
<< std::format("Was able to read len={}", len);
result = std::string(payload.begin(), payload.end());
DLOG(INFO) << prefix.data() << "Payload: \n" << result;
} catch (const std::exception &e) {
LOG(WARNING) << prefix.data() << "handling error: " << e.what();
}
return result;
}
} // namespace cloud_point_rpc

View File

@ -1,18 +1,16 @@
#pragma once
#include "cloud_point_rpc/serialize.hpp"
#include <asio.hpp>
#include <atomic>
#include <cloud_point_rpc/tcp_read.hpp>
#include <functional>
#include <glog/logging.h>
#include <string>
#include <thread>
#include "export.h"
#include <list>
#include <ranges>
namespace score {
class CRPC_EXPORT TcpServer {
namespace cloud_point_rpc {
class TcpServer {
public:
using RequestProcessor = std::function<std::string(const std::string &)>;
@ -20,15 +18,9 @@ class CRPC_EXPORT TcpServer {
: ip_(ip), port_(port), processor_(std::move(processor)),
acceptor_(io_context_), running_(false) {}
~TcpServer() {
stop();
for (auto &thread : client_threads | std::views::keys) {
thread.join();
}
}
~TcpServer() { stop(); }
void start() {
using namespace std::chrono_literals;
try {
asio::ip::tcp::endpoint endpoint(asio::ip::make_address(ip_),
port_);
@ -43,14 +35,6 @@ class CRPC_EXPORT TcpServer {
accept_thread_ = std::jthread([this]() {
LOG(INFO) << "Accept thread started";
while (running_) {
std::ranges::remove_if(client_threads.begin(), client_threads.end(), [](auto& client_info) {
bool result = false;
if (client_info.second.wait_for(0ms) == std::future_status::ready) {
client_info.first.join();
result = true;
}
return result;
});
try {
auto socket = std::make_shared<asio::ip::tcp::socket>(
io_context_);
@ -59,11 +43,10 @@ class CRPC_EXPORT TcpServer {
LOG(INFO)
<< "New connection from "
<< socket->remote_endpoint().address().to_string();
auto done = std::make_shared<std::promise<bool>>();
client_threads.push_back(std::make_pair(std::jthread([this, socket, done]() {
std::jthread([this, socket]() {
handle_client(socket);
done->set_value(true);
}),done->get_future()));
}).detach();
} catch (const std::system_error &e) {
LOG(INFO) << "Accept exception: " << e.what();
if (running_) {
@ -115,14 +98,32 @@ class CRPC_EXPORT TcpServer {
private:
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
LOG(INFO) << "Server reading from client...";
try {
auto payload = tcp_read(*socket, "TCPServer] ");
size_t payload_length = payload.size();
LOG(INFO) << "Server reading from client...";
std::array<char, 8> header;
size_t header_length = asio::read(*socket, asio::buffer(header));
if (header_length != 8) {
LOG(WARNING) << "Invalid header length: " << header_length;
return;
}
uint64_t packet_size = *reinterpret_cast<uint64_t *>(header.data());
LOG(INFO) << "Expected packet size: " << packet_size;
std::vector<char> payload(packet_size);
size_t payload_length = asio::read(*socket, asio::buffer(payload));
LOG(INFO) << "Read payload length: " << payload_length;
if (payload_length != packet_size) {
LOG(WARNING) << "Payload length mismatch: expected "
<< packet_size << ", got " << payload_length;
return;
}
if (payload_length > 0) {
std::string response = processor_(payload);
std::string request(payload.data(), payload_length);
LOG(INFO) << "Server processing request: " << request;
std::string response = processor_(request);
response += "\n";
DLOG(INFO) << "Server sending response: " << response;
LOG(INFO) << "Server sending response: " << response;
inplace_size_embedding(response);
asio::write(*socket, asio::buffer(response));
LOG(INFO) << "Server sent response";
@ -140,7 +141,6 @@ class CRPC_EXPORT TcpServer {
asio::ip::tcp::acceptor acceptor_;
std::atomic<bool> running_;
std::list<std::pair<std::jthread, std::future<bool>>> client_threads;
std::jthread accept_thread_;
};

View File

@ -1,14 +0,0 @@
#pragma once
#if defined(_WIN32)
#ifdef CRPC_SERVER_API_EXPORT
#define CRPC_EXPORT __declspec(dllexport)
#else
#define CRPC_EXPORT __declspec(dllimport)
#endif
#elif defined(__GNUC__) // GCC, Clang, etc.
// Linux, macOS, etc.
#define CRPC_EXPORT __attribute__((visibility("default")))
#else
#define CRPC_EXPORT
#pragma warning Unknown dynamic link import/export semantics.
#endif

View File

@ -1,38 +0,0 @@
#ifndef CRPC_SERVER_API
#define CRPC_SERVER_API
#include <cstdint>
#include "export.h"
#ifdef __cplusplus
extern "C" {
#endif //cpp
/**
* @brief basically just std::string wrapper:
* struct rpc_string {
* std::string s;
* };
* has internal gc and would be automatically deallocated on deinit call,
* but it is better to call destroy manually, to prevent exceeding memory usage
*/
struct CRPC_EXPORT rpc_string;
CRPC_EXPORT const char* crpc_str_get_data(const rpc_string*);
CRPC_EXPORT uint64_t crpc_str_get_size(const rpc_string*);
CRPC_EXPORT rpc_string* crpc_str_create(const char* data, uint64_t size);
CRPC_EXPORT void crpc_str_destroy(rpc_string*);
typedef rpc_string*(*callback_t)(rpc_string*);
CRPC_EXPORT void crpc_init(const char* config_path);
CRPC_EXPORT void crpc_deinit();
CRPC_EXPORT void crpc_add_method(callback_t cb, rpc_string* name);
#ifdef __cplusplus
}
#endif //cpp
#endif //CRPC_SERVER_API

View File

@ -1,50 +0,0 @@
#ifndef CRPC_TEST_API
#define CRPC_TEST_API
#include <cstdint>
#include "export.h"
#ifdef __cplusplus
extern "C" {
#endif //cpp
struct CRPC_EXPORT rpc_string;
/// @brief callback function intendent to handle json and retrieve string data
typedef rpc_string*(*callback_t)(rpc_string*);
/// @brief call it to initialize library thread;
/// by default would call available methods one by one
CRPC_EXPORT void crpc_test_init();
/// @brief please, call it, when you're done working with the lib
/// @note blocking operation
CRPC_EXPORT void crpc_test_deinit();
/// @brief crpc_test_add_method will add to the available set/map
/// @note blocking operation
CRPC_EXPORT void crpc_test_add_method(callback_t cb, rpc_string* name);
/// @brief crpc_test_remove_method would
/// @param name pointer to name string; memory management not transferred
CRPC_EXPORT int crpc_test_remove_method(rpc_string* name);
/// @brief crpc_schedule_call would add to the execution queue
/// may be called one after current
/// @note blocking operation
CRPC_EXPORT void crpc_test_schedule_call(rpc_string* name);
/// @brief crpc_test_change_duration will change duration between regular calls
/// @note blocking operation
CRPC_EXPORT void crpc_test_change_duration(uint64_t duration_ms);
/// @note blocking operation
CRPC_EXPORT uint64_t crpc_test_duration();
/// @brief crpc_test_auto_call change internal state of auto call
/// @param state 0 - turn off auto call on sleep; 1 - turn on auto call on sleep
CRPC_EXPORT void crpc_test_auto_call(uint32_t state);
#ifdef __cplusplus
}
#endif //cpp
#endif //CRPC_SERVER_API

View File

@ -6,23 +6,11 @@ project('cloud_point_rpc', 'cpp',
json_dep = dependency('nlohmann_json', fallback : ['nlohmann_json', 'nlohmann_json_dep'])
thread_dep = dependency('threads')
asio_dep = dependency('asio', fallback : ['asio', 'asio_dep'])
base64_dep = dependency('base64', fallback: ['aklomp-base64', 'base64'])
# GLog via CMake fallback
cmake = import('cmake')
glog_opt = cmake.subproject_options()
glog_opt.add_cmake_defines({
'WITH_GFLAGS': 'OFF',
'WITH_GTEST': 'OFF',
})
libtype = get_option('default_library')
if libtype == 'static'
message('Will share static state with glog')
glog_opt.add_cmake_defines({
'BUILD_SHARED_LIBS': 'OFF',
})
endif
glog_opt.add_cmake_defines({'WITH_GFLAGS': 'OFF', 'WITH_GTEST': 'OFF'})
glog_proj = cmake.subproject('glog', options: glog_opt)
glog_dep = glog_proj.dependency('glog')
@ -38,18 +26,3 @@ inc = include_directories(['include', 'rpc/include'])
subdir('src')
subdir('tests')
if host_machine.system() == 'windows' and libtype == 'shared'
message('We are on Windows, so to prevent our ass from pain')
devenv = environment()
message('By the way, I hate windows :O')
prefixed = meson.global_build_root() / 'subprojects'
message('Prefixed dir is: ', prefixed)
devenv.append('PATH', prefixed / 'glog')
devenv.append('PATH', prefixed / 'googletest-1.17.0' / 'googletest')
devenv.append('PATH', prefixed / 'googletest-1.17.0' / 'googlemock')
devenv.append('PATH', prefixed / 'yaml-cpp-0.8.0')
meson.add_devenv(devenv)
endif

View File

@ -3,7 +3,7 @@
#include <glog/logging.h>
#include <string>
namespace score {
namespace cloud_point_rpc {
void print_menu(std::ostream &output) {
output << "\n=== Cloud Point RPC CLI ===" << std::endl;

View File

@ -24,8 +24,8 @@ int main(int argc, char *argv[]) {
f.close();
try {
auto config = score::ConfigLoader::load(config_path);
return score::run_cli(std::cin, std::cout, config.server.ip,
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
return cloud_point_rpc::run_cli(std::cin, std::cout, config.server.ip,
config.server.port);
} catch (const std::exception &e) {
std::cerr << "Failed to start CLI: " << e.what() << std::endl;

View File

@ -1,56 +1,28 @@
add_project_arguments('-DCRPC_SERVER_API_EXPORT -pthread', language: 'cpp')
cloud_point_rpc_sources = files(
'rpc_server.cpp',
'service.cpp',
'server_api.cpp',
'rpc_coder.cpp'
'cli.cpp'
)
libcloud_point_rpc = shared_library('cloud_point_rpc',
libcloud_point_rpc = static_library('cloud_point_rpc',
cloud_point_rpc_sources,
include_directories : inc,
dependencies : [json_dep, thread_dep, glog_dep, yaml_dep, asio_dep, base64_dep],
dependencies : [json_dep, thread_dep, glog_dep, yaml_dep, asio_dep],
install : true)
cloud_point_rpc_dep = declare_dependency(
include_directories : inc,
link_with : libcloud_point_rpc,
dependencies : [json_dep, glog_dep, yaml_dep, asio_dep, base64_dep])
# Test lib
libcloud_point_rpc_test = shared_library('test_cloud_point',
'test_api.cpp',
dependencies: cloud_point_rpc_dep,
install : true)
cloud_point_rpc_test_dep = declare_dependency(
include_directories: inc,
link_with: libcloud_point_rpc_test,
dependencies: [cloud_point_rpc_dep]
)
libcloud_point_rpc_cli = shared_library('libcloud_point_rpc_cli',
'cli.cpp',
include_directories : inc,
dependencies : [cloud_point_rpc_dep],
install : true)
cloud_point_rpc_cli_dep = declare_dependency(
include_directories: inc,
link_with: libcloud_point_rpc_cli,
dependencies: [cloud_point_rpc_dep]
)
dependencies : [json_dep, glog_dep, yaml_dep, asio_dep])
# Client/CLI tool (legacy stdin/stdout)
executable('cloud_point_rpc_cli',
['main.cpp', ],
dependencies : cloud_point_rpc_cli_dep,
'main.cpp',
dependencies : cloud_point_rpc_dep,
install : true)
# Server executable (TCP)
executable('cloud_point_rpc_server',
'server_main.cpp',
dependencies : cloud_point_rpc_dep,
link_args : '-pthread',
install : true)

View File

@ -1,42 +0,0 @@
//
// Created by vptyp on 11.03.2026.
//
#include "cloud_point_rpc/rpc_coder.h"
#include "libbase64.h"
#include <glog/logging.h>
namespace score {
Base64RPCCoder::Base64RPCCoder() = default;
Base64RPCCoder::~Base64RPCCoder() = default;
/**
* Tries to decode ASCII complained string to the
* @param encoded ASCII complained base64 encoded string
* @return vector of raw bytes << allocated on encoded.size() / 4 * 3 + 1 size
*/
std::vector<char> Base64RPCCoder::decode(const std::string& encoded) {
DLOG(INFO) << "Base64RPCCoder::decode";
std::vector<char> result((encoded.length() >> 2) * 3 + 1);
size_t result_len = 0;
base64_decode(encoded.data(), encoded.size(),
result.data(), &result_len, 0);
DLOG(INFO) << "result_len: " << result_len;
return result;
}
/**
*
* @param data raw byte stream
* @return encoded base64 string
*/
std::string Base64RPCCoder::encode(const std::vector<char>& data) {
DLOG(INFO) << "Base64RPCCoder::encode";
size_t result_len = 0;
std::string result(data.size() / 3 * 4 + 1, 0);
base64_encode(data.data(), data.size(),
result.data(), &result_len, 0
);
DLOG(INFO) << "result_len: " << result_len;
return result;
}
}

View File

@ -2,7 +2,7 @@
#include <glog/logging.h>
using json = nlohmann::json;
namespace score {
namespace cloud_point_rpc {
namespace {
json create_error(int code, const std::string &message,
@ -21,22 +21,12 @@ void RpcServer::register_method(const std::string &name, Handler handler) {
handlers_[name] = std::move(handler);
}
void RpcServer::register_method(const std::string& name, callback_t handler) {
handlers_[name] = [handler](const nlohmann::json& j) -> nlohmann::json {
rpc_string tmp;
tmp.s = j.dump();
rpc_string* res = handler(&tmp);
return {res->s};
};
}
std::string RpcServer::process(const std::string &request_str) {
LOG(INFO) << request_str;
json request;
try {
request = json::parse(request_str);
} catch (const json::parse_error &) {
LOG(ERROR) << "json parse error; " << __func__;
LOG(ERROR) << "json parse error" << __func__;
return create_error(-32700, "Parse error").dump();
}

View File

@ -1,68 +0,0 @@
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
#include <glog/logging.h>
#include "server_api.h"
#include <algorithm>
#include <memory>
#include <string>
#include <list>
static std::list<std::unique_ptr<rpc_string>> gc;
score::RpcServer rpc_server;
std::unique_ptr<score::TcpServer> server = nullptr;
extern "C" {
const char* crpc_str_get_data(const rpc_string* that) {
return that->s.c_str();
}
uint64_t crpc_str_get_size(const rpc_string* that){
return that->s.size();
}
rpc_string* crpc_str_create(const char* data, uint64_t size){
gc.push_back(std::make_unique<rpc_string>(data, size));
return gc.back().get();
}
void crpc_str_destroy(rpc_string* that){
auto it = std::ranges::find(gc, that, &std::unique_ptr<rpc_string>::get);
if(it != gc.end())
gc.erase(it);
}
void crpc_init(const char* config_path) {
google::InitGoogleLogging("CloudPointRPC");
if(config_path == nullptr) {
LOG(INFO) << "config_path was not provided";
}
try {
auto config = score::ConfigLoader::load(config_path);
LOG(INFO) << "Loaded config from " << config_path;
server = std::make_unique<score::TcpServer>(config.server.ip, config.server.port,
[&](const std::string &request) {
return rpc_server.process(
request);
});
server->start();
} catch (const std::exception &e) {
LOG(ERROR) << "Fatal error: " << e.what();
}
}
void crpc_deinit() {
if(server)
server->join();
server.reset();
gc.clear();
}
void crpc_add_method(callback_t cb, rpc_string* name) {
rpc_server.register_method(name->s, cb);
}
}

View File

@ -20,12 +20,12 @@ int main(int argc, char *argv[]) {
LOG(INFO) << "Starting Cloud Point RPC Server (Test Mock)...";
try {
auto config = score::ConfigLoader::load(config_path);
auto config = cloud_point_rpc::ConfigLoader::load(config_path);
LOG(INFO) << "Loaded config from " << config_path;
// Inject test data into service
score::Service service(config.test_data);
score::RpcServer rpc_server;
cloud_point_rpc::Service service(config.test_data);
cloud_point_rpc::RpcServer rpc_server;
rpc_server.register_method("get-intrinsic-params", [&](const json &) {
return service.get_intrinsic_params();
@ -39,7 +39,7 @@ int main(int argc, char *argv[]) {
return service.get_cloud_point();
});
score::TcpServer server(config.server.ip, config.server.port,
cloud_point_rpc::TcpServer server(config.server.ip, config.server.port,
[&](const std::string &request) {
return rpc_server.process(
request);

View File

@ -1,6 +1,6 @@
#include "cloud_point_rpc/service.hpp"
namespace score {
namespace cloud_point_rpc {
Service::Service(const TestData &data) : data_(data) {}

View File

@ -1,174 +0,0 @@
#include "test_api.h"
#include "cloud_point_rpc/rpc_server.hpp"
#include "server_api.h"
#include <condition_variable>
#include <glog/logging.h>
#include <list>
#include <mutex>
#include <queue>
#include <set>
#include <stop_token>
#include <thread>
class TestThread {
static std::string make_jsonrpc(const std::string &method_name) {
return std::format(R"(
{{
"jsonrpc": "2.0",
"method": "{}",
"params": {{}},
"id": 1
}}
)",
method_name);
}
public:
TestThread() = default;
~TestThread() { join(); }
void routine() {
size_t distance{0};
std::unique_lock lock(mtx);
const std::stop_token stoken = thr.get_stop_token();
lock.unlock();
while (!stoken.stop_requested()) {
lock.lock();
if (!calls_queue.empty()) {
auto front = calls_queue.front();
DLOG(INFO) << front << " will try to call";
calls_queue.pop();
LOG(INFO) << server.process(make_jsonrpc(front));
} else if (state.load() && !methods.empty()) {
auto it = methods.begin();
std::advance(it, distance++ % methods.size());
DLOG(INFO) << *it << " : Method at this position";
LOG(INFO) << server.process(make_jsonrpc(*it));
}
if (state.load() && calls_queue.empty())
cv.wait_for(lock, thr_sleep, [&] {
return stoken.stop_requested();
});
lock.unlock();
}
LOG(INFO) << "Stopped";
}
void start() { thr = std::jthread(&TestThread::routine, this); }
void join() {
if (thr.joinable()) {
DLOG(INFO) << "Requested thread stop";
thr.request_stop();
cv.notify_one();
thr.join();
}
}
void add_method(const callback_t cb, rpc_string *name) {
LOG(INFO) << "Trying to add method: " << name->s;
std::lock_guard lock(mtx);
if (methods.contains(name->s)) {
LOG(INFO) << "Method already exists: " << name->s;
return;
}
methods.emplace(name->s);
server.register_method(name->s, cb);
}
int remove_method(const rpc_string *name) {
LOG(INFO) << "Trying to remove method: " << name->s;
std::lock_guard lock(mtx);
int result = 0;
auto it = std::find(methods.begin(), methods.end(), name->s);
if (it != methods.end()) {
methods.erase(it);
} else {
LOG(ERROR) << "Method not found: " << name->s;
result = -1;
}
return result;
}
void call(const rpc_string *name) {
std::lock_guard lock(mtx);
LOG(INFO) << server.process(name->s);
}
void set_duration(uint64_t duration_ms) {
LOG(INFO) << "Trying to install sleep duration: " << duration_ms;
std::lock_guard lock(mtx);
thr_sleep = std::chrono::milliseconds(duration_ms);
}
uint64_t get_duration() {
std::lock_guard lock(mtx);
return thr_sleep.count();
}
void add_queue_call(const std::string &name) {
std::lock_guard lock(mtx);
calls_queue.emplace(name);
}
void auto_call(bool state) {
this->state.store(state, std::memory_order_relaxed);
}
void reset() {
std::lock_guard lock(mtx);
calls_queue = std::queue<std::string>();
methods.clear();
state.store(true, std::memory_order_relaxed);
server = score::RpcServer();
}
private:
std::atomic<bool> state{true};
std::condition_variable cv;
std::queue<std::string> calls_queue{};
std::set<std::string> methods{};
std::jthread thr;
std::mutex mtx;
score::RpcServer server;
std::chrono::duration<int64_t, std::milli> thr_sleep{50};
} test;
extern "C" {
void crpc_test_init() {
if (!google::IsGoogleLoggingInitialized())
google::InitGoogleLogging("TestRPC");
try {
test.start();
} catch (const std::exception &e) {
LOG(ERROR) << "Fatal error: " << e.what();
}
}
void crpc_test_deinit() {
test.join();
crpc_deinit();
test.reset();
}
void crpc_test_add_method(callback_t cb, rpc_string *name) {
test.add_method(cb, name);
}
void crpc_test_change_duration(uint64_t duration_ms) {
test.set_duration(duration_ms);
}
uint64_t crpc_test_duration() { return test.get_duration(); }
int crpc_test_remove_method(rpc_string *name) {
return test.remove_method(name);
}
void crpc_test_schedule_call(rpc_string *name) { test.add_queue_call(name->s); }
void crpc_test_auto_call(uint32_t state) {
test.auto_call(static_cast<bool>(state));
}
}

View File

@ -1,14 +0,0 @@
[wrap-file]
directory = base64-0.5.2
source_url = https://github.com/aklomp/base64/archive/refs/tags/v0.5.2.tar.gz
source_filename = base64-0.5.2.tar.gz
source_hash = 723a0f9f4cf44cf79e97bcc315ec8f85e52eb104c8882942c3f2fba95acc080d
source_fallback_url = https://wrapdb.mesonbuild.com/v2/aklomp-base64_0.5.2-1/get_source/base64-0.5.2.tar.gz
patch_filename = aklomp-base64_0.5.2-1_patch.zip
patch_url = https://wrapdb.mesonbuild.com/v2/aklomp-base64_0.5.2-1/get_patch
patch_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/aklomp-base64_0.5.2-1/aklomp-base64_0.5.2-1_patch.zip
patch_hash = 9805354b8c0333fe0123c10d8c62356ef1d0d67a2689a348d18f73bddc1e2b10
wrapdb_version = 0.5.2-1
[provide]
dependency_names = base64

View File

@ -1,14 +1,12 @@
test_sources = files(
'test_rpc.cpp',
'test_integration.cpp',
'test_tcp.cpp',
'test_cli.cpp',
'test_c_api.cpp',
'test_base64.cpp'
'test_tcp.cpp'
)
test_exe = executable('unit_tests',
test_sources,
dependencies : [cloud_point_rpc_dep, cloud_point_rpc_cli_dep, cloud_point_rpc_test_dep, json_dep, gtest_dep, gtest_main_dep, gmock_dep])
dependencies : [cloud_point_rpc_dep, gtest_dep, gtest_main_dep, gmock_dep])
test('unit_tests', test_exe)

View File

@ -1,31 +0,0 @@
//
// Created by vptyp on 11.03.2026.
//
#include <chrono>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <thread>
#include "cloud_point_rpc/config.hpp"
#include "cloud_point_rpc/rpc_coder.h"
class Base64Test : public ::testing::Test {
protected:
void SetUp() override {
}
void TearDown() override {
}
};
TEST_F(Base64Test, EncodeDecode) {
std::vector raw{'H', 'e', 'l', 'l', 'o', 'w', '\0'};
score::Base64RPCCoder coder;
auto encoded = coder.encode(raw);
LOG(INFO) << "encoded: " << encoded;
auto decoded = coder.decode(encoded);
EXPECT_EQ(std::ranges::equal(decoded, raw), true);
LOG(INFO) << "done";
}

View File

@ -1,169 +0,0 @@
#include "cloud_point_rpc/rpc_server.hpp"
#include "server_api.h"
#include "test_api.h"
#include <future>
#include <glog/logging.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
class TestCApi : public ::testing::Test {
protected:
void SetUp() override {
FLAGS_logtostderr = true;
if (!google::IsGoogleLoggingInitialized())
google::InitGoogleLogging("TestRPC");
EXPECT_NO_THROW(crpc_test_init());
}
void TearDown() override { crpc_test_deinit(); }
};
TEST_F(TestCApi, Base) {
rpc_string name;
name.s = "test";
static std::promise<bool> task;
std::future<bool> called = task.get_future();
DLOG(INFO) << "Test";
crpc_test_add_method(
+[](rpc_string *) -> rpc_string * {
static bool installed = false;
DLOG(INFO) << "Trying to do something";
if (!installed) {
DLOG(INFO) << "Trying to install";
installed = true;
task.set_value(installed);
}
DLOG(INFO) << "Go out";
return crpc_str_create("res", sizeof("res"));
},
&name);
using namespace std::chrono_literals;
const std::future_status res = called.wait_for(500ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called.get(), true);
DLOG(INFO) << "DONE";
}
TEST_F(TestCApi, AddedMultiple) {
constexpr int N = 4;
std::array<std::promise<bool>, N> tasks;
std::array<std::pair<std::future<bool>, rpc_string>, N> called;
// The Bridge: A static pointer local to this test function
static std::array<std::promise<bool>, N>* bridge;
bridge = &tasks;
for (int i = 0; i < N; i++) {
called[i].first = tasks[i].get_future();
std::string n = "test" + std::to_string(i);
called[i].second = rpc_string{n.c_str(), n.size()};
}
auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* {
static bool installed = false;
if (!installed) {
installed = true;
(*bridge)[I].set_value(true);
}
return crpc_str_create("res", sizeof("res"));
}, &called[I].second);
};
register_idx.template operator()<0>();
register_idx.template operator()<1>();
register_idx.template operator()<2>();
register_idx.template operator()<3>();
auto test_idx = [&]<size_t I>() {
using namespace std::chrono_literals;
const std::future_status res = called[I].first.wait_for(1000ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called[I].first.get(), true);
LOG(INFO) << "Done" << "; task=" << I;
};
test_idx.template operator()<0>();
test_idx.template operator()<1>();
test_idx.template operator()<2>();
test_idx.template operator()<3>();
}
TEST_F(TestCApi, RemoveMethod) {
rpc_string name{"test", sizeof("test") - 1};
EXPECT_EQ(crpc_test_remove_method(&name), -1);
}
TEST_F(TestCApi, ChangeDuration) {
constexpr int dur = 1e3;
EXPECT_NO_THROW(crpc_test_change_duration(dur));
EXPECT_EQ(crpc_test_duration(), dur);
LOG(INFO) << "Test";
}
TEST_F(TestCApi, ScheduleCall) {
EXPECT_NO_THROW(crpc_test_auto_call(0));
constexpr int N = 4;
std::array<std::promise<bool>, N> tasks;
std::array<std::pair<std::future<bool>, rpc_string>, N> called;
// The Bridge: A static pointer local to this test function
static std::array<std::promise<bool>, N>* bridge;
bridge = &tasks;
LOG(INFO) << "Started Schedule Call";
for (int i = 0; i < N; i++) {
called[i].first = tasks[i].get_future();
std::string n = "test" + std::to_string(i);
called[i].second = rpc_string{n.c_str(), n.size()};
}
auto register_idx = [&]<size_t I>() {
crpc_test_add_method(+[](rpc_string*) -> rpc_string* {
static bool installed = false;
if (!installed) {
installed = true;
(*bridge)[I].set_value(true);
}
return crpc_str_create("res", sizeof("res"));
}, &called[I].second);
};
auto test_idx = [&]<size_t I>() {
using namespace std::chrono_literals;
const std::future_status res = called[I].first.wait_for(500ms);
EXPECT_NE(res, std::future_status::timeout);
EXPECT_EQ(called[I].first.get(), true);
};
auto schedule_call = []<size_t I>() {
rpc_string test{"test", sizeof("test") - 1};
test.s += std::to_string(I);
LOG(INFO) << "Trying to add :" << test.s;
crpc_test_schedule_call(&test);
};
register_idx.template operator()<0>();
register_idx.template operator()<1>();
register_idx.template operator()<2>();
register_idx.template operator()<3>();
schedule_call.template operator()<0>();
schedule_call.template operator()<1>();
schedule_call.template operator()<2>();
schedule_call.template operator()<3>();
test_idx.template operator()<0>();
test_idx.template operator()<1>();
test_idx.template operator()<2>();
test_idx.template operator()<3>();
}
TEST_F(TestCApi, String) {
rpc_string name;
name.s = "test";
EXPECT_EQ(name.s.c_str(), crpc_str_get_data(&name));
EXPECT_EQ(name.s.size(), crpc_str_get_size(&name));
auto creation = crpc_str_create("test 2222", sizeof("test 2222"));
EXPECT_NO_THROW(crpc_str_destroy(creation));
}

View File

@ -9,26 +9,24 @@
#include "cloud_point_rpc/rpc_server.hpp"
#include "cloud_point_rpc/tcp_server.hpp"
using namespace score;
using namespace cloud_point_rpc;
class CliTest : public ::testing::Test {
public:
void start() {
tcp_server->start();
}
protected:
void SetUp() override {
server_ip = "127.0.0.1";
server_port = 9096;
rpc_server = std::make_unique<RpcServer>();
rpc_server->register_method(
"hello", [](const nlohmann::json &) { return "world"; });
tcp_server = std::make_unique<TcpServer>(
server_ip, server_port, [this](const std::string &req) {
return rpc_server->process(req);
});
tcp_server->start();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
@ -51,7 +49,7 @@ TEST_F(CliTest, SendsInputToServerAndReceivesResponse) {
"get-intrinsic-params", [](const nlohmann::json &) {
return std::vector<double>{14589.0, 22489.0, 3123124.555};
});
this->start();
input << "1" << std::endl;
input << "0" << std::endl;

View File

@ -11,7 +11,7 @@
#include <fstream>
using namespace score;
using namespace cloud_point_rpc;
class IntegrationTest : public ::testing::Test {
protected:
@ -69,7 +69,7 @@ class IntegrationTest : public ::testing::Test {
std::remove("config.yaml");
}
Config config_{};
Config config_;
std::unique_ptr<Service> service_;
std::unique_ptr<RpcServer> rpc_server_;
std::unique_ptr<TcpServer> tcp_server_;

View File

@ -7,7 +7,7 @@
#include <vector>
using json = nlohmann::json;
using namespace score;
using namespace cloud_point_rpc;
class RpcServerTest : public ::testing::Test {
protected:

View File

@ -15,11 +15,11 @@ class TcpTest : public ::testing::Test {
protected:
void SetUp() override {
server_ = std::make_unique<score::TcpServer>(
server_ = std::make_unique<cloud_point_rpc::TcpServer>(
"127.0.0.1", 12345, [this](const std::string &request) {
EXPECT_EQ(request, expected_);
std::string msg = "Echo: " + request;
auto v = score::serialize(msg.length());
auto v = cloud_point_rpc::serialize(msg.length());
std::string res(v.begin(), v.end());
res += msg;
return res;
@ -33,26 +33,26 @@ class TcpTest : public ::testing::Test {
}
std::string expected_;
std::unique_ptr<score::TcpServer> server_;
std::unique_ptr<cloud_point_rpc::TcpServer> server_;
};
TEST(SerializeTest, Base) {
uint64_t value{123};
auto res = score::serialize(value);
EXPECT_EQ(value, score::deserialize<uint64_t>(res));
auto res = cloud_point_rpc::serialize(value);
EXPECT_EQ(value, cloud_point_rpc::deserialize<uint64_t>(res));
}
TEST_F(TcpTest, EchoTest) {
constexpr std::string_view msg = "Hello, TCP Server!";
ExpectedResponse(msg.data());
score::TCPConnector connector("127.0.0.1", 12345);
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(msg.data());
}
TEST_F(TcpTest, HugeBuffer) {
static constexpr uint64_t w = 1280, h = 720, c = 3;
static constexpr uint64_t w = 1920, h = 1080, c = 3;
std::string data(w * h * c, 77);
ExpectedResponse(data);
score::TCPConnector connector("127.0.0.1", 12345);
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
auto res = connector.Send(data);
}