1
0
Fork 0
mirror of https://github.com/gwm17/Daqromancy.git synced 2024-11-22 18:58:51 -05:00

Simplify network messagees. We only send list data, so no need to specify if list or waves.

This commit is contained in:
Gordon McCann 2022-09-29 14:47:07 -04:00
parent 83d81ed1d7
commit 00bf570b51
3 changed files with 17 additions and 52 deletions

View File

@ -6,54 +6,19 @@
namespace Daqromancy { namespace Daqromancy {
enum class BSMessageType : uint64_t struct DYMessage
{ {
List, DYMessage() = default;
Mixed, //Maybe someday hahaha DYMessage(const std::vector<DYData>& data)
None
};
struct BSHeader
{
BSMessageType type = BSMessageType::None;
uint64_t size = 0;
};
struct BSMessage
{
BSMessage() = default;
BSMessage(const std::vector<DYData>& data)
{ {
header.type = BSMessageType::List; size = data.size() * Data::dataSize;
body.resize(size);
for (const auto& datum : data) for (const auto& datum : data)
LoadDYDataToBuffer(body, datum); LoadDYDataToBuffer(body, datum);
} }
BSHeader header; uint64_t size; //in bytes
std::vector<char> body; std::vector<char> body;
std::size_t Size() const
{
return header.size;
}
//Nasty work. Convert header into a raw bytes array for transmission. Makes it so padding is no longer a concern.
std::vector<char> GetHeaderRaw() const
{
std::vector<char> rawBytes(sizeof(header.type) + sizeof(header.size));
std::size_t position = 0;
int loopIndex;
char* dataPointer = (char*)&header.type;
for (loopIndex = 0; loopIndex < sizeof(header.type); loopIndex++)
rawBytes[position++] = dataPointer[loopIndex];
dataPointer = (char*)&header.size;
for (loopIndex = 0; loopIndex < sizeof(header.size); loopIndex++)
rawBytes[position++] = dataPointer[loopIndex];
return rawBytes;
}
}; };
} }

View File

@ -16,7 +16,7 @@ namespace Daqromancy {
that the loop of write calls and job submissions had stopped (or was not yet started), so we need to restart/start the that the loop of write calls and job submissions had stopped (or was not yet started), so we need to restart/start the
cycle of writing data to the socket. cycle of writing data to the socket.
*/ */
void TCPServerConnection::Send(const BSMessage& message) void TCPServerConnection::Send(const DYMessage& message)
{ {
asio::post(m_contextRef, asio::post(m_contextRef,
[this, message]() [this, message]()
@ -38,13 +38,13 @@ namespace Daqromancy {
void TCPServerConnection::WriteHeader() void TCPServerConnection::WriteHeader()
{ {
std::vector<char> headerData = m_queue.Front().GetHeaderRaw(); uint64_t messageSize = m_queue.Front().size;
asio::async_write(m_socket, asio::buffer(headerData, headerData.size()), asio::async_write(m_socket, asio::buffer(&messageSize, sizeof(messageSize)),
[this](std::error_code ec, std::size_t length) //Callback upon completion [this](std::error_code ec, std::size_t length) //Callback upon completion
{ {
if (!ec) if (!ec)
{ {
if (m_queue.Front().Size() > 0) if (m_queue.Front().size > 0)
WriteBody(); //submit next job to asio: write body data WriteBody(); //submit next job to asio: write body data
else else
{ {
@ -64,7 +64,7 @@ namespace Daqromancy {
void TCPServerConnection::WriteBody() void TCPServerConnection::WriteBody()
{ {
asio::async_write(m_socket, asio::buffer(m_queue.Front().body, m_queue.Front().Size()), asio::async_write(m_socket, asio::buffer(m_queue.Front().body, m_queue.Front().size),
[this](std::error_code ec, std::size_t length) [this](std::error_code ec, std::size_t length)
{ {
if (!ec) if (!ec)
@ -140,7 +140,7 @@ namespace Daqromancy {
if (!m_dataHandle.dataQueue->IsEmpty()) if (!m_dataHandle.dataQueue->IsEmpty())
continue; continue;
BSMessage message(m_dataHandle.dataQueue->Front()); DYMessage message(m_dataHandle.dataQueue->Front());
MessageClients(message); MessageClients(message);
m_dataHandle.dataQueue->PopFront(); m_dataHandle.dataQueue->PopFront();
} }
@ -159,7 +159,7 @@ namespace Daqromancy {
m_dataHandle.dataQueue->ResetWakeup(); m_dataHandle.dataQueue->ResetWakeup();
} }
void TCPServer::MessageClients(const BSMessage& message) void TCPServer::MessageClients(const DYMessage& message)
{ {
bool isAnyClientInvalid = false; bool isAnyClientInvalid = false;
std::scoped_lock<std::mutex> guard(m_clientMutex); std::scoped_lock<std::mutex> guard(m_clientMutex);

View File

@ -22,7 +22,7 @@ namespace Daqromancy {
~TCPServerConnection(); ~TCPServerConnection();
bool IsConnected() const { return m_socket.is_open(); } bool IsConnected() const { return m_socket.is_open(); }
void Send(const BSMessage& message); void Send(const DYMessage& message);
void Disconnect(); void Disconnect();
@ -33,7 +33,7 @@ namespace Daqromancy {
asio::ip::tcp::socket m_socket; asio::ip::tcp::socket m_socket;
asio::io_context& m_contextRef; asio::io_context& m_contextRef;
ThreadSafeQueue<BSMessage> m_queue; ThreadSafeQueue<DYMessage> m_queue;
}; };
/* /*
@ -86,7 +86,7 @@ namespace Daqromancy {
private: private:
void MessageClients(const BSMessage& message); void MessageClients(const DYMessage& message);
void WaitForClient(); void WaitForClient();
asio::io_context m_context; asio::io_context m_context;