Skip to content

Commit 3e0a784

Browse files
committed
refactor GateIO::startListening by splitting it's huge body to serial nested function calls
1 parent be03fc2 commit 3e0a784

File tree

3 files changed

+193
-134
lines changed

3 files changed

+193
-134
lines changed

vpr/src/server/commconstants.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
#ifndef COMMCONSTS_H
22
#define COMMCONSTS_H
33

4+
#include <string_view>
5+
46
namespace comm {
57

68
constexpr const char* KEY_JOB_ID = "JOB_ID";
79
constexpr const char* KEY_CMD = "CMD";
810
constexpr const char* KEY_OPTIONS = "OPTIONS";
911
constexpr const char* KEY_DATA = "DATA";
1012
constexpr const char* KEY_STATUS = "STATUS";
11-
constexpr const char* ECHO_DATA = "ECHO";
13+
constexpr std::string_view ECHO_DATA{"ECHO"};
1214

1315
const unsigned char ZLIB_COMPRESSOR_ID = 'z';
1416
const unsigned char NONE_COMPRESSOR_ID = '\x0';

vpr/src/server/gateio.cpp

Lines changed: 168 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@
33
#ifndef NO_SERVER
44

55
#include "telegramparser.h"
6-
#include "telegrambuffer.h"
76
#include "commconstants.h"
87
#include "convertutils.h"
98

10-
#include "sockpp/tcp6_acceptor.h"
11-
129
#include <iostream>
1310

1411
namespace server {
@@ -63,6 +60,162 @@ void GateIO::moveTasksToSendQueue(std::vector<TaskPtr>& tasks)
6360
tasks.clear();
6461
}
6562

63+
GateIO::ActivityStatus GateIO::checkClientConnection(sockpp::tcp6_acceptor& tcpServer, std::unique_ptr<ClientAliveTracker>& clientAliveTrackerPtr, std::optional<sockpp::tcp6_socket>& clientOpt) {
64+
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
65+
66+
sockpp::inet6_address peer;
67+
sockpp::tcp6_socket client = tcpServer.accept(&peer);
68+
if (client) {
69+
m_logger.queue(LogLevel::Info, "client", client.address().to_string() , "connection accepted");
70+
client.set_non_blocking(true);
71+
clientOpt = std::move(client);
72+
73+
status = ActivityStatus::CLIENT_ACTIVITY;
74+
}
75+
76+
return status;
77+
}
78+
79+
GateIO::ActivityStatus GateIO::handleSendingData(sockpp::tcp6_socket& client, std::unique_ptr<ClientAliveTracker>& clientAliveTrackerPtr) {
80+
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
81+
std::unique_lock<std::mutex> lock(m_tasksMutex);
82+
83+
if (!m_sendTasks.empty()) {
84+
const TaskPtr& task = m_sendTasks.at(0);
85+
try {
86+
std::size_t bytesToSend = std::min(CHUNK_MAX_BYTES_NUM, task->responseBuffer().size());
87+
std::size_t bytesActuallyWritten = client.write_n(task->responseBuffer().data(), bytesToSend);
88+
if (bytesActuallyWritten <= task->origReponseBytesNum()) {
89+
task->chopNumSentBytesFromResponseBuffer(bytesActuallyWritten);
90+
m_logger.queue(LogLevel::Detail,
91+
"sent chunk:", getPrettySizeStrFromBytesNum(bytesActuallyWritten),
92+
"from", getPrettySizeStrFromBytesNum(task->origReponseBytesNum()),
93+
"left:", getPrettySizeStrFromBytesNum(task->responseBuffer().size()));
94+
status = ActivityStatus::CLIENT_ACTIVITY;
95+
}
96+
} catch(...) {
97+
m_logger.queue(LogLevel::Detail, "error while writing chunk");
98+
status = ActivityStatus::COMMUNICATION_PROBLEM;
99+
}
100+
101+
if (task->isResponseFullySent()) {
102+
m_logger.queue(LogLevel::Info, "sent:", task->telegramHeader().info(), task->info());
103+
}
104+
}
105+
106+
// remove reported tasks
107+
std::size_t tasksBeforeRemoving = m_sendTasks.size();
108+
109+
auto partitionIter = std::partition(m_sendTasks.begin(), m_sendTasks.end(),
110+
[](const TaskPtr& task) { return !task->isResponseFullySent(); });
111+
m_sendTasks.erase(partitionIter, m_sendTasks.end());
112+
bool removingTookPlace = tasksBeforeRemoving != m_sendTasks.size();
113+
if (!m_sendTasks.empty() && removingTookPlace) {
114+
m_logger.queue(LogLevel::Detail, "left tasks num to send ", m_sendTasks.size());
115+
}
116+
117+
return status;
118+
}
119+
120+
GateIO::ActivityStatus GateIO::handleReceivingData(sockpp::tcp6_socket& client, comm::TelegramBuffer& telegramBuff, std::string& receivedMessage) {
121+
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
122+
if (receivedMessage.size() != CHUNK_MAX_BYTES_NUM) {
123+
receivedMessage.resize(CHUNK_MAX_BYTES_NUM);
124+
}
125+
std::size_t bytesActuallyReceived{0};
126+
try {
127+
bytesActuallyReceived = client.read_n(&receivedMessage[0], CHUNK_MAX_BYTES_NUM);
128+
} catch(...) {
129+
m_logger.queue(LogLevel::Error, "fail to receiving");
130+
status = ActivityStatus::COMMUNICATION_PROBLEM;
131+
}
132+
133+
if ((bytesActuallyReceived > 0) && (bytesActuallyReceived <= CHUNK_MAX_BYTES_NUM)) {
134+
m_logger.queue(LogLevel::Detail, "received chunk:", getPrettySizeStrFromBytesNum(bytesActuallyReceived));
135+
telegramBuff.append(comm::ByteArray{receivedMessage.c_str(), bytesActuallyReceived});
136+
status = ActivityStatus::CLIENT_ACTIVITY;
137+
}
138+
139+
return status;
140+
}
141+
142+
GateIO::ActivityStatus GateIO::handleTelegrams(std::vector<comm::TelegramFramePtr>& telegramFrames, comm::TelegramBuffer& telegramBuff) {
143+
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
144+
telegramFrames.clear();
145+
telegramBuff.takeTelegramFrames(telegramFrames);
146+
for (const comm::TelegramFramePtr& telegramFrame: telegramFrames) {
147+
// process received data
148+
std::string message{telegramFrame->data.to_string()};
149+
bool isEchoTelegram = false;
150+
if ((message.size() == comm::ECHO_DATA.size()) && (message == comm::ECHO_DATA)) {
151+
m_logger.queue(LogLevel::Detail, "received", comm::ECHO_DATA);
152+
isEchoTelegram = true;
153+
status = ActivityStatus::CLIENT_ACTIVITY;
154+
}
155+
156+
if (!isEchoTelegram) {
157+
m_logger.queue(LogLevel::Detail, "received composed", getPrettySizeStrFromBytesNum(message.size()), ":", getTruncatedMiddleStr(message));
158+
std::optional<int> jobIdOpt = comm::TelegramParser::tryExtractFieldJobId(message);
159+
std::optional<int> cmdOpt = comm::TelegramParser::tryExtractFieldCmd(message);
160+
std::optional<std::string> optionsOpt;
161+
comm::TelegramParser::tryExtractFieldOptions(message, optionsOpt);
162+
if (jobIdOpt && cmdOpt && optionsOpt) {
163+
TaskPtr task = std::make_unique<Task>(jobIdOpt.value(), cmdOpt.value(), optionsOpt.value());
164+
const comm::TelegramHeader& header = telegramFrame->header;
165+
m_logger.queue(LogLevel::Info, "received:", header.info(), task->info(/*skipDuration*/true));
166+
std::unique_lock<std::mutex> lock(m_tasksMutex);
167+
m_receivedTasks.push_back(std::move(task));
168+
} else {
169+
m_logger.queue(LogLevel::Error, "broken telegram detected, fail extract options from", message);
170+
}
171+
}
172+
}
173+
174+
return status;
175+
}
176+
177+
GateIO::ActivityStatus GateIO::handleClientAliveTracker(sockpp::tcp6_socket& client, std::unique_ptr<ClientAliveTracker>& clientAliveTrackerPtr)
178+
{
179+
ActivityStatus status = ActivityStatus::WAITING_ACTIVITY;
180+
if (clientAliveTrackerPtr) {
181+
/// handle sending echo to client
182+
if (clientAliveTrackerPtr->isTimeToSentEcho()) {
183+
comm::TelegramHeader echoHeader = comm::TelegramHeader::constructFromData(comm::ECHO_DATA);
184+
std::string message = echoHeader.buffer().to_string();
185+
message.append(comm::ECHO_DATA);
186+
try {
187+
std::size_t bytesActuallySent = client.write(message);
188+
if (bytesActuallySent == message.size()) {
189+
m_logger.queue(LogLevel::Detail, "sent", comm::ECHO_DATA);
190+
clientAliveTrackerPtr->onEchoSent();
191+
}
192+
} catch(...) {
193+
m_logger.queue(LogLevel::Debug, "fail to sent", comm::ECHO_DATA);
194+
status = ActivityStatus::COMMUNICATION_PROBLEM;
195+
}
196+
}
197+
198+
/// handle client timeout
199+
if (clientAliveTrackerPtr->isClientTimeout()) {
200+
m_logger.queue(LogLevel::Error, "client didn't respond too long");
201+
status = ActivityStatus::COMMUNICATION_PROBLEM;
202+
}
203+
}
204+
205+
return status;
206+
}
207+
208+
void GateIO::handleActivityStatus(ActivityStatus status, std::unique_ptr<ClientAliveTracker>& clientAliveTrackerPtr, bool& isCommunicationProblemDetected)
209+
{
210+
if (status == ActivityStatus::CLIENT_ACTIVITY) {
211+
if (clientAliveTrackerPtr) {
212+
clientAliveTrackerPtr->onClientActivity();
213+
}
214+
} else if (status == ActivityStatus::COMMUNICATION_PROBLEM) {
215+
isCommunicationProblemDetected = true;
216+
}
217+
}
218+
66219
void GateIO::startListening()
67220
{
68221
#ifdef ENABLE_CLIENT_ALIVE_TRACKER
@@ -72,17 +225,13 @@ void GateIO::startListening()
72225
std::unique_ptr<ClientAliveTracker> clientAliveTrackerPtr;
73226
#endif
74227

75-
static const std::string echoData{comm::ECHO_DATA};
76-
77228
comm::TelegramBuffer telegramBuff;
78229
std::vector<comm::TelegramFramePtr> telegramFrames;
79230

80231
sockpp::initialize();
81232
sockpp::tcp6_acceptor tcpServer(m_portNum);
82233
tcpServer.set_non_blocking(true);
83234

84-
const std::size_t chunkMaxBytesNum = 2*1024*1024; // 2Mb
85-
86235
if (tcpServer) {
87236
m_logger.queue(LogLevel::Info, "open server, port=", m_portNum);
88237
} else {
@@ -97,119 +246,29 @@ void GateIO::startListening()
97246
while(m_isRunning.load()) {
98247
bool isCommunicationProblemDetected = false;
99248

100-
/// check for the client connection
101249
if (!clientOpt) {
102-
sockpp::inet6_address peer;
103-
sockpp::tcp6_socket client = tcpServer.accept(&peer);
104-
if (client) {
105-
m_logger.queue(LogLevel::Info, "client", client.address().to_string() , "connection accepted");
106-
client.set_non_blocking(true);
107-
clientOpt = std::move(client);
108-
250+
ActivityStatus status = checkClientConnection(tcpServer, clientAliveTrackerPtr, clientOpt);
251+
if (status == ActivityStatus::CLIENT_ACTIVITY) {
109252
if (clientAliveTrackerPtr) {
110253
clientAliveTrackerPtr->reset();
111254
}
112255
}
113256
}
114257

115258
if (clientOpt) {
116-
sockpp::tcp6_socket& client = clientOpt.value();
259+
sockpp::tcp6_socket& client = clientOpt.value(); // shortcut
117260

118-
/// handle sending response
119-
{
120-
std::unique_lock<std::mutex> lock(m_tasksMutex);
121-
122-
if (!m_sendTasks.empty()) {
123-
const TaskPtr& task = m_sendTasks.at(0);
124-
try {
125-
std::size_t bytesToSend = std::min(chunkMaxBytesNum, task->responseBuffer().size());
126-
std::size_t bytesActuallyWritten = client.write_n(task->responseBuffer().data(), bytesToSend);
127-
if (bytesActuallyWritten <= task->origReponseBytesNum()) {
128-
task->chopNumSentBytesFromResponseBuffer(bytesActuallyWritten);
129-
m_logger.queue(LogLevel::Detail,
130-
"sent chunk:", getPrettySizeStrFromBytesNum(bytesActuallyWritten),
131-
"from", getPrettySizeStrFromBytesNum(task->origReponseBytesNum()),
132-
"left:", getPrettySizeStrFromBytesNum(task->responseBuffer().size()));
133-
if (clientAliveTrackerPtr) {
134-
clientAliveTrackerPtr->onClientActivity();
135-
}
136-
}
137-
} catch(...) {
138-
m_logger.queue(LogLevel::Detail, "error while writing chunk");
139-
isCommunicationProblemDetected = true;
140-
}
141-
142-
if (task->isResponseFullySent()) {
143-
m_logger.queue(LogLevel::Info,
144-
"sent:", task->telegramHeader().info(), task->info());
145-
}
146-
}
147-
148-
// remove reported tasks
149-
std::size_t tasksBeforeRemoving = m_sendTasks.size();
150-
151-
auto partitionIter = std::partition(m_sendTasks.begin(), m_sendTasks.end(),
152-
[](const TaskPtr& task) { return !task->isResponseFullySent(); });
153-
m_sendTasks.erase(partitionIter, m_sendTasks.end());
154-
bool removingTookPlace = tasksBeforeRemoving != m_sendTasks.size();
155-
if (!m_sendTasks.empty() && removingTookPlace) {
156-
m_logger.queue(LogLevel::Detail, "left tasks num to send ", m_sendTasks.size());
157-
}
158-
} // release lock
261+
/// handle sending
262+
ActivityStatus status = handleSendingData(client, clientAliveTrackerPtr);
263+
handleActivityStatus(status, clientAliveTrackerPtr, isCommunicationProblemDetected);
159264

160265
/// handle receiving
161-
if (receivedMessage.size() != chunkMaxBytesNum) {
162-
receivedMessage.resize(chunkMaxBytesNum);
163-
}
164-
std::size_t bytesActuallyReceived{0};
165-
try {
166-
bytesActuallyReceived = client.read_n(&receivedMessage[0], chunkMaxBytesNum);
167-
} catch(...) {
168-
m_logger.queue(LogLevel::Error, "fail to receiving");
169-
isCommunicationProblemDetected = true;
170-
}
171-
172-
if ((bytesActuallyReceived > 0) && (bytesActuallyReceived <= chunkMaxBytesNum)) {
173-
m_logger.queue(LogLevel::Detail, "received chunk:", getPrettySizeStrFromBytesNum(bytesActuallyReceived));
174-
telegramBuff.append(comm::ByteArray{receivedMessage.c_str(), bytesActuallyReceived});
175-
if (clientAliveTrackerPtr) {
176-
clientAliveTrackerPtr->onClientActivity();
177-
}
178-
}
266+
status = handleReceivingData(client, telegramBuff, receivedMessage);
267+
handleActivityStatus(status, clientAliveTrackerPtr, isCommunicationProblemDetected);
179268

180269
/// handle telegrams
181-
telegramFrames.clear();
182-
telegramBuff.takeTelegramFrames(telegramFrames);
183-
for (const comm::TelegramFramePtr& telegramFrame: telegramFrames) {
184-
// process received data
185-
std::string message{telegramFrame->data.to_string()};
186-
bool isEchoTelegram = false;
187-
if (clientAliveTrackerPtr) {
188-
if ((message.size() == echoData.size()) && (message == echoData)) {
189-
m_logger.queue(LogLevel::Detail, "received", echoData);
190-
clientAliveTrackerPtr->onClientActivity();
191-
isEchoTelegram = true;
192-
}
193-
}
194-
195-
if (!isEchoTelegram) {
196-
m_logger.queue(LogLevel::Detail, "received composed", getPrettySizeStrFromBytesNum(message.size()), ":", getTruncatedMiddleStr(message));
197-
std::optional<int> jobIdOpt = comm::TelegramParser::tryExtractFieldJobId(message);
198-
std::optional<int> cmdOpt = comm::TelegramParser::tryExtractFieldCmd(message);
199-
std::optional<std::string> optionsOpt;
200-
comm::TelegramParser::tryExtractFieldOptions(message, optionsOpt);
201-
if (jobIdOpt && cmdOpt && optionsOpt) {
202-
TaskPtr task = std::make_unique<Task>(jobIdOpt.value(), cmdOpt.value(), optionsOpt.value());
203-
const comm::TelegramHeader& header = telegramFrame->header;
204-
m_logger.queue(LogLevel::Info,
205-
"received:", header.info(), task->info(/*skipDuration*/true));
206-
std::unique_lock<std::mutex> lock(m_tasksMutex);
207-
m_receivedTasks.push_back(std::move(task));
208-
} else {
209-
m_logger.queue(LogLevel::Error, "broken telegram detected, fail extract options from", message);
210-
}
211-
}
212-
}
270+
status = handleTelegrams(telegramFrames, telegramBuff);
271+
handleActivityStatus(status, clientAliveTrackerPtr, isCommunicationProblemDetected);
213272

214273
// forward telegramBuffer errors
215274
std::vector<std::string> telegramBufferErrors;
@@ -219,31 +278,8 @@ void GateIO::startListening()
219278
}
220279

221280
/// handle client alive tracker
222-
if (clientAliveTrackerPtr) {
223-
if (clientAliveTrackerPtr->isTimeToSentEcho()) {
224-
comm::TelegramHeader echoHeader = comm::TelegramHeader::constructFromData(echoData);
225-
std::string message = echoHeader.buffer().to_string();
226-
message.append(echoData);
227-
try {
228-
std::size_t bytesActuallySent = client.write(message);
229-
if (bytesActuallySent == message.size()) {
230-
m_logger.queue(LogLevel::Detail, "sent", echoData);
231-
clientAliveTrackerPtr->onEchoSent();
232-
}
233-
} catch(...) {
234-
m_logger.queue(LogLevel::Debug, "fail to sent", echoData);
235-
isCommunicationProblemDetected = true;
236-
}
237-
}
238-
}
239-
240-
/// handle client alive
241-
if (clientAliveTrackerPtr) {
242-
if (clientAliveTrackerPtr->isClientTimeout()) {
243-
m_logger.queue(LogLevel::Error, "client didn't respond too long");
244-
isCommunicationProblemDetected = true;
245-
}
246-
}
281+
status = handleClientAliveTracker(client, clientAliveTrackerPtr);
282+
handleActivityStatus(status, clientAliveTrackerPtr, isCommunicationProblemDetected);
247283

248284
/// handle communication problem
249285
if (isCommunicationProblemDetected) {

0 commit comments

Comments
 (0)