diff --git a/include/cloud_point_rpc/tcp_connector.hpp b/include/cloud_point_rpc/tcp_connector.hpp index e5286ef..030ab77 100644 --- a/include/cloud_point_rpc/tcp_connector.hpp +++ b/include/cloud_point_rpc/tcp_connector.hpp @@ -2,6 +2,7 @@ #include "cloud_point_rpc/serialize.hpp" #include "jsonrpccxx/iclientconnector.hpp" #include +#include #include #include @@ -34,42 +35,7 @@ class TCPConnector : public jsonrpccxx::IClientConnector { auto remove_const = request; inplace_size_embedding(remove_const); asio::write(socket_, asio::buffer(remove_const)); - return read(); - } - - protected: - std::string read() { - std::string result; - std::array header; - LOG(INFO) << "trying to read"; - try { - size_t len = - asio::read(socket_, asio::buffer(header, header.size())); - if (len != sizeof(uint64_t)) { - LOG(ERROR) << "failed to read header"; - return result; - } - std::vector v(header.begin(), header.begin() + 8); - uint64_t packet_size = deserialize(v); - LOG(INFO) << "Received packet of size: " << packet_size; - - std::vector payload(packet_size); - len = asio::read(socket_, asio::buffer(payload)); - - // if failed to fetch in one pack, try to read until size - while (len < packet_size) { - len += asio::read(socket_, asio::buffer(payload.data() + len, - payload.size() - len)); - } - - LOG(INFO) << std::format("Was able to read len={}", len); - result = std::string(payload.begin(), payload.end()); - LOG(INFO) << "Payload: \n" << result; - } catch (const std::exception &e) { - LOG(WARNING) << "Connector handling error: " << e.what(); - } - - return result; + return tcp_read(socket_, "TCPConnector] "); } private: diff --git a/include/cloud_point_rpc/tcp_read.hpp b/include/cloud_point_rpc/tcp_read.hpp new file mode 100644 index 0000000..8b71a0d --- /dev/null +++ b/include/cloud_point_rpc/tcp_read.hpp @@ -0,0 +1,43 @@ +#pragma once +#include +#include +#include +#include +namespace cloud_point_rpc { + +static inline std::string tcp_read(asio::ip::tcp::socket &socket, + std::string_view prefix) { + std::string result; + std::array header; + LOG(INFO) << prefix.data() << "trying to read"; + try { + size_t len = asio::read(socket, asio::buffer(header, header.size())); + if (len != sizeof(uint64_t)) { + LOG(ERROR) << prefix.data() << "failed to read header"; + return result; + } + std::vector v(header.begin(), header.begin() + 8); + uint64_t packet_size = deserialize(v); + LOG(INFO) << prefix.data() + << "Received packet of size: " << packet_size; + + std::vector payload(packet_size); + len = asio::read(socket, asio::buffer(payload)); + + // if failed to fetch in one pack, try to read until size + while (len < packet_size) { + len += asio::read(socket, asio::buffer(payload.data() + len, + payload.size() - len)); + } + + LOG(INFO) << prefix.data() + << std::format("Was able to read len={}", len); + result = std::string(payload.begin(), payload.end()); + DLOG(INFO) << prefix.data() << "Payload: \n" << result; + } catch (const std::exception &e) { + LOG(WARNING) << prefix.data() << "handling error: " << e.what(); + } + return result; +} + +} // namespace cloud_point_rpc \ No newline at end of file diff --git a/include/cloud_point_rpc/tcp_server.hpp b/include/cloud_point_rpc/tcp_server.hpp index f9278d8..b4556b2 100644 --- a/include/cloud_point_rpc/tcp_server.hpp +++ b/include/cloud_point_rpc/tcp_server.hpp @@ -1,8 +1,8 @@ #pragma once -#include "cloud_point_rpc/serialize.hpp" #include #include +#include #include #include #include @@ -98,32 +98,14 @@ class TcpServer { private: void handle_client(std::shared_ptr socket) { + LOG(INFO) << "Server reading from client..."; try { - LOG(INFO) << "Server reading from client..."; - - std::array header; - size_t header_length = asio::read(*socket, asio::buffer(header)); - if (header_length != 8) { - LOG(WARNING) << "Invalid header length: " << header_length; - return; - } - uint64_t packet_size = *reinterpret_cast(header.data()); - LOG(INFO) << "Expected packet size: " << packet_size; - - std::vector payload(packet_size); - size_t payload_length = asio::read(*socket, asio::buffer(payload)); - LOG(INFO) << "Read payload length: " << payload_length; - if (payload_length != packet_size) { - LOG(WARNING) << "Payload length mismatch: expected " - << packet_size << ", got " << payload_length; - return; - } + auto payload = tcp_read(*socket, "TCPServer] "); + size_t payload_length = payload.size(); if (payload_length > 0) { - std::string request(payload.data(), payload_length); - LOG(INFO) << "Server processing request: " << request; - std::string response = processor_(request); + std::string response = processor_(payload); response += "\n"; - LOG(INFO) << "Server sending response: " << response; + DLOG(INFO) << "Server sending response: " << response; inplace_size_embedding(response); asio::write(*socket, asio::buffer(response)); LOG(INFO) << "Server sent response"; diff --git a/tests/test_tcp.cpp b/tests/test_tcp.cpp index ba4fd9d..852f94c 100644 --- a/tests/test_tcp.cpp +++ b/tests/test_tcp.cpp @@ -50,7 +50,7 @@ TEST_F(TcpTest, EchoTest) { } TEST_F(TcpTest, HugeBuffer) { - static constexpr uint64_t w = 1920, h = 1080, c = 3; + static constexpr uint64_t w = 1280, h = 720, c = 3; std::string data(w * h * c, 77); ExpectedResponse(data); cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);