[tcp] removed duplicate code
:Release Notes: removed unnecessary code duplication for TCP socket read in tcp_server and tcp_connector. Changed several log lines to appear only in debug build
This commit is contained in:
parent
8ee06fe6a0
commit
d26ab35339
@ -2,6 +2,7 @@
|
|||||||
#include "cloud_point_rpc/serialize.hpp"
|
#include "cloud_point_rpc/serialize.hpp"
|
||||||
#include "jsonrpccxx/iclientconnector.hpp"
|
#include "jsonrpccxx/iclientconnector.hpp"
|
||||||
#include <asio.hpp>
|
#include <asio.hpp>
|
||||||
|
#include <cloud_point_rpc/tcp_read.hpp>
|
||||||
#include <glog/logging.h>
|
#include <glog/logging.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
@ -34,42 +35,7 @@ class TCPConnector : public jsonrpccxx::IClientConnector {
|
|||||||
auto remove_const = request;
|
auto remove_const = request;
|
||||||
inplace_size_embedding(remove_const);
|
inplace_size_embedding(remove_const);
|
||||||
asio::write(socket_, asio::buffer(remove_const));
|
asio::write(socket_, asio::buffer(remove_const));
|
||||||
return read();
|
return tcp_read(socket_, "TCPConnector] ");
|
||||||
}
|
|
||||||
|
|
||||||
protected:
|
|
||||||
std::string read() {
|
|
||||||
std::string result;
|
|
||||||
std::array<char, 8> header;
|
|
||||||
LOG(INFO) << "trying to read";
|
|
||||||
try {
|
|
||||||
size_t len =
|
|
||||||
asio::read(socket_, asio::buffer(header, header.size()));
|
|
||||||
if (len != sizeof(uint64_t)) {
|
|
||||||
LOG(ERROR) << "failed to read header";
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
|
|
||||||
uint64_t packet_size = deserialize<uint64_t>(v);
|
|
||||||
LOG(INFO) << "Received packet of size: " << packet_size;
|
|
||||||
|
|
||||||
std::vector<char> payload(packet_size);
|
|
||||||
len = asio::read(socket_, asio::buffer(payload));
|
|
||||||
|
|
||||||
// if failed to fetch in one pack, try to read until size
|
|
||||||
while (len < packet_size) {
|
|
||||||
len += asio::read(socket_, asio::buffer(payload.data() + len,
|
|
||||||
payload.size() - len));
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(INFO) << std::format("Was able to read len={}", len);
|
|
||||||
result = std::string(payload.begin(), payload.end());
|
|
||||||
LOG(INFO) << "Payload: \n" << result;
|
|
||||||
} catch (const std::exception &e) {
|
|
||||||
LOG(WARNING) << "Connector handling error: " << e.what();
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
43
include/cloud_point_rpc/tcp_read.hpp
Normal file
43
include/cloud_point_rpc/tcp_read.hpp
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <array>
|
||||||
|
#include <asio.hpp>
|
||||||
|
#include <cloud_point_rpc/serialize.hpp>
|
||||||
|
#include <glog/logging.h>
|
||||||
|
namespace cloud_point_rpc {
|
||||||
|
|
||||||
|
static inline std::string tcp_read(asio::ip::tcp::socket &socket,
|
||||||
|
std::string_view prefix) {
|
||||||
|
std::string result;
|
||||||
|
std::array<char, 8> header;
|
||||||
|
LOG(INFO) << prefix.data() << "trying to read";
|
||||||
|
try {
|
||||||
|
size_t len = asio::read(socket, asio::buffer(header, header.size()));
|
||||||
|
if (len != sizeof(uint64_t)) {
|
||||||
|
LOG(ERROR) << prefix.data() << "failed to read header";
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
std::vector<uint8_t> v(header.begin(), header.begin() + 8);
|
||||||
|
uint64_t packet_size = deserialize<uint64_t>(v);
|
||||||
|
LOG(INFO) << prefix.data()
|
||||||
|
<< "Received packet of size: " << packet_size;
|
||||||
|
|
||||||
|
std::vector<char> payload(packet_size);
|
||||||
|
len = asio::read(socket, asio::buffer(payload));
|
||||||
|
|
||||||
|
// if failed to fetch in one pack, try to read until size
|
||||||
|
while (len < packet_size) {
|
||||||
|
len += asio::read(socket, asio::buffer(payload.data() + len,
|
||||||
|
payload.size() - len));
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << prefix.data()
|
||||||
|
<< std::format("Was able to read len={}", len);
|
||||||
|
result = std::string(payload.begin(), payload.end());
|
||||||
|
DLOG(INFO) << prefix.data() << "Payload: \n" << result;
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
LOG(WARNING) << prefix.data() << "handling error: " << e.what();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace cloud_point_rpc
|
||||||
@ -1,8 +1,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "cloud_point_rpc/serialize.hpp"
|
|
||||||
#include <asio.hpp>
|
#include <asio.hpp>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <cloud_point_rpc/tcp_read.hpp>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <glog/logging.h>
|
#include <glog/logging.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
@ -98,32 +98,14 @@ class TcpServer {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
|
void handle_client(std::shared_ptr<asio::ip::tcp::socket> socket) {
|
||||||
|
LOG(INFO) << "Server reading from client...";
|
||||||
try {
|
try {
|
||||||
LOG(INFO) << "Server reading from client...";
|
auto payload = tcp_read(*socket, "TCPServer] ");
|
||||||
|
size_t payload_length = payload.size();
|
||||||
std::array<char, 8> header;
|
|
||||||
size_t header_length = asio::read(*socket, asio::buffer(header));
|
|
||||||
if (header_length != 8) {
|
|
||||||
LOG(WARNING) << "Invalid header length: " << header_length;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
uint64_t packet_size = *reinterpret_cast<uint64_t *>(header.data());
|
|
||||||
LOG(INFO) << "Expected packet size: " << packet_size;
|
|
||||||
|
|
||||||
std::vector<char> payload(packet_size);
|
|
||||||
size_t payload_length = asio::read(*socket, asio::buffer(payload));
|
|
||||||
LOG(INFO) << "Read payload length: " << payload_length;
|
|
||||||
if (payload_length != packet_size) {
|
|
||||||
LOG(WARNING) << "Payload length mismatch: expected "
|
|
||||||
<< packet_size << ", got " << payload_length;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (payload_length > 0) {
|
if (payload_length > 0) {
|
||||||
std::string request(payload.data(), payload_length);
|
std::string response = processor_(payload);
|
||||||
LOG(INFO) << "Server processing request: " << request;
|
|
||||||
std::string response = processor_(request);
|
|
||||||
response += "\n";
|
response += "\n";
|
||||||
LOG(INFO) << "Server sending response: " << response;
|
DLOG(INFO) << "Server sending response: " << response;
|
||||||
inplace_size_embedding(response);
|
inplace_size_embedding(response);
|
||||||
asio::write(*socket, asio::buffer(response));
|
asio::write(*socket, asio::buffer(response));
|
||||||
LOG(INFO) << "Server sent response";
|
LOG(INFO) << "Server sent response";
|
||||||
|
|||||||
@ -50,7 +50,7 @@ TEST_F(TcpTest, EchoTest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(TcpTest, HugeBuffer) {
|
TEST_F(TcpTest, HugeBuffer) {
|
||||||
static constexpr uint64_t w = 1920, h = 1080, c = 3;
|
static constexpr uint64_t w = 1280, h = 720, c = 3;
|
||||||
std::string data(w * h * c, 77);
|
std::string data(w * h * c, 77);
|
||||||
ExpectedResponse(data);
|
ExpectedResponse(data);
|
||||||
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
|
cloud_point_rpc::TCPConnector connector("127.0.0.1", 12345);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user