1
0
Fork 0
mirror of https://github.com/gwm17/Specter.git synced 2024-11-22 18:28:52 -05:00

Add TCPClient class (to make new types of sources easier) and update CompassOnlineSource to reflect. Added handling of fragmented data, but not perfect solution, see comments in CompassOnlineSource

This commit is contained in:
Gordon McCann 2022-04-17 17:08:06 -04:00
parent 8ea527c69f
commit 1a1b2c90c9
6 changed files with 346 additions and 55 deletions

View File

@ -1,7 +1,7 @@
/*
CompassOnlineSource.cpp
A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we
A data source for online CAEN CoMPASS Data. Uses TCPClient to connect to the CoMPASS server. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Note that here we use syncrhonous since we
need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting
for data to come over the pipe. We handle the case of an un-filled buffer internally.
@ -12,57 +12,52 @@
digitizers have the SAME number of channels. By default CompassRun assumes 16 channels per board, as this is what is used with the SE-SPS setup at FoxLab.
If you use a different set of boards, CHANGE THIS VALUE! If you use mixed boards, you will need to invent a new id scheme altogether.
GWM -- Feb 2022
ADDITIONALLY
CoMPASS servers provide no stream side information on the state of a transfer (verified via communication w/ CAEN). That is: there are no headers or enders on the data transfers.
This forces us to use the size of a single CoMPASS datum (CompassHit) to determine the state of a transfer. If the read buffer size is not a whole multiple of CompassHits, the data
was determined to be fragmented. This has a huge drawback: in general there is no guarantee that the first byte of the transferred data is the first byte of a CompassHit. This means that
Navigator MUST be connected to the CoMPASS server BEFORE starting the aquisition. Otherwise the program could endup in a state of scrambled unpacking (also verified w/ CAEN).
Maybe we can get them to change this? Headers reeaaally should exist for transfers like this.
GWM -- April 2022
*/
#include "CompassOnlineSource.h"
namespace Navigator {
CompassOnlineSource::CompassOnlineSource(const std::string& hostname, const std::string& port) :
DataSource(), m_bufferIter(nullptr), m_bufferEnd(nullptr), m_socket(m_socketContext)
DataSource(), m_bufferIter(nullptr), m_bufferEnd(nullptr)
{
m_buffer.resize(m_bufferSize);
InitSocket(hostname, port);
InitConnection(hostname, port);
}
CompassOnlineSource::~CompassOnlineSource() {}
void CompassOnlineSource::InitSocket(const std::string& hostname, const std::string& port)
void CompassOnlineSource::InitConnection(const std::string& hostname, const std::string& port)
{
m_validFlag = false;
//asio tends to work in terms of exceptions, which we avoid in the rest of this project.
//If the connection is unsuccessful, an exception is thrown, which we parse and print to the terminal.
try
m_connection.Connect(hostname, port);
if (m_connection.IsOpen())
{
asio::ip::tcp::resolver resolver(m_socketContext);
auto end_points = resolver.resolve(hostname, port);
asio::connect(m_socket, end_points);
m_socket.non_blocking(true); //Set the connection as non-blocking
}
catch (asio::system_error& error)
{
NAV_ERROR("Unable to connect to remote port for CompassOnlineSource! Error Code: {0}", error.what());
return;
}
NAV_INFO("Successfully connected to host {0} on port {1}", hostname, port);
m_validFlag = true;
}
}
const NavData& CompassOnlineSource::GetData()
{
size_t range = m_bufferEnd - m_bufferIter; //how much buffer we have left
if (!IsValid())
{
NAV_ERROR("Attempting to access invalid source at CompassOnlineSource!");
m_datum = NavData();
return m_datum;
}
else if (m_bufferIter == nullptr || m_bufferIter == m_bufferEnd)
else if (m_bufferIter == nullptr || range < m_datasize || m_bufferIter == m_bufferEnd) //If no buffer/buffer completely used/buffer fragmented fill
{
FillBuffer();
}
if (m_bufferIter != m_bufferEnd)
if (m_bufferIter != m_bufferEnd && range >= m_datasize)//If buffer and enough data for a hit, get it
GetHit();
else
{
@ -80,26 +75,21 @@ namespace Navigator {
void CompassOnlineSource::FillBuffer()
{
asio::error_code code;
std::vector<char> recieved = m_connection.Read();
//If we didn't finish the last buffer toss all of the stuff we used and then append the recieved data
//Otherwise, copy over the recieved buffer. Note lack of vector::resize, vector::reserve. Intentional for performance.
//The amount of copying/resizing is best handled by the std (according to multiple references)
if (m_bufferIter != m_bufferEnd)
{
size_t pos = m_bufferEnd - m_bufferIter;
m_currentBuffer.erase(m_currentBuffer.begin(), m_currentBuffer.begin() + (m_currentBuffer.size() - pos)); //remove used bytes
m_currentBuffer.insert(m_currentBuffer.end(), recieved.begin(), recieved.end());
}
else
m_currentBuffer = recieved;
size_t length = m_socket.read_some(asio::buffer(m_buffer), code);
m_bufferEnd = m_buffer.data() + length;
m_bufferIter = m_buffer.data();
if (code == asio::error::eof)
{
NAV_WARN("CompassOnlineSource invalidated by host. Invalidating and detaching source.");
m_validFlag = false;
}
else if (code == asio::error::would_block) //Ignore cases where the socket would have blocked
{
return;
}
else if (code)
{
NAV_ERROR("CompassOnlineSource recieved unexpected error from host. Error message: {0}", code.message());
NAV_WARN("Invalidating and detaching source.");
m_validFlag = false;
}
m_bufferIter = m_currentBuffer.data();
m_bufferEnd = m_currentBuffer.data() + m_currentBuffer.size();
}
void CompassOnlineSource::GetHit()

View File

@ -1,7 +1,7 @@
/*
CompassOnlineSource.h
A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we
A data source for online CAEN CoMPASS Data. Uses TCPClient to connect to the CoMPASS server. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Note that here we use syncrhonous since we
need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting
for data to come over the pipe. We handle the case of an un-filled buffer internally.
@ -12,14 +12,21 @@
digitizers have the SAME number of channels. By default CompassRun assumes 16 channels per board, as this is what is used with the SE-SPS setup at FoxLab.
If you use a different set of boards, CHANGE THIS VALUE! If you use mixed boards, you will need to invent a new id scheme altogether.
GWM -- Feb 2022
ADDITIONALLY
CoMPASS servers provide no stream side information on the state of a transfer (verified via communication w/ CAEN). That is: there are no headers or enders on the data transfers.
This forces us to use the size of a single CoMPASS datum (CompassHit) to determine the state of a transfer. If the read buffer size is not a whole multiple of CompassHits, the data
was determined to be fragmented. This has a huge drawback: in general there is no guarantee that the first byte of the transferred data is the first byte of a CompassHit. This means that
Navigator MUST be connected to the CoMPASS server BEFORE starting the aquisition. Otherwise the program could endup in a state of scrambled unpacking (also verified w/ CAEN).
Maybe we can get them to change this? Headers reeaaally should exist for transfers like this.
GWM -- April 2022
*/
#ifndef COMPASS_ONLINE_SOURCE_H
#define COMPASS_ONLINE_SOURCE_H
#include "Navigator/Physics/DataSource.h"
#include "Navigator/Utils/TCPClient.h"
#include "CompassHit.h"
#include "asio.hpp"
namespace Navigator {
@ -32,19 +39,20 @@ namespace Navigator {
virtual const NavData& GetData() override;
private:
void InitSocket(const std::string& hostname, const std::string& port);
void InitConnection(const std::string& hostname, const std::string& port);
void FillBuffer();
void GetHit();
std::vector<char> m_buffer;
static constexpr size_t m_bufferSize = 24000; //Max amount of data we allow the source to buffer in. I don't think this should ever be maxed?
std::vector<char> m_currentBuffer;
std::vector<char> m_fragment;
static constexpr int m_datasize = 24; //size of CoMPASS hit in bytes, change as needed (if for example you have calibrated energies)
const int m_nchannels_per_board = 16; //IMPORTANT: Used for ID'ing channels uniquely. If you use boards with 32 or 8 or 64 channels you must change this! If you mix boards with
//different numbers of channels, you will have to find a different id solution.
char* m_bufferIter;
char* m_bufferEnd;
CompassHit m_currentHit;
asio::io_context m_socketContext;
asio::ip::tcp::socket m_socket;
TCPClient m_connection;
};

View File

@ -0,0 +1,85 @@
/*
TCPClient.h
Navigator's TCP client rep using asio. Contains very basic ability to read and write from a tcp socket in a non-blocking, synchronous method.
See asio docs for more details and examples on how to use.
GWM -- April 2022
Note: the write functionality has not been verified. Should be fine, but test before using.
*/
#include "TCPClient.h"
namespace Navigator {
TCPClient::TCPClient() :
m_socket(m_context)
{
m_readBuffer.resize(s_readBufferSize);
}
TCPClient::TCPClient(const std::string& host, const std::string& port) :
m_socket(m_context)
{
m_readBuffer.resize(s_readBufferSize);
Connect(host, port);
}
TCPClient::~TCPClient()
{
Close();
}
void TCPClient::Connect(const std::string& host, const std::string& port)
{
try
{
asio::ip::tcp::resolver resolver(m_context);
auto end_points = resolver.resolve(host, port);
asio::connect(m_socket, end_points);
m_socket.non_blocking(true); //Set the connection as non-blocking
}
catch (asio::system_error& e)
{
NAV_ERROR("Unable to connect to remote port for TCPClient! Error Code: {0}", e.what());
}
}
std::vector<char> TCPClient::Read()
{
asio::error_code code;
size_t length = m_socket.read_some(asio::buffer(m_readBuffer, m_readBuffer.size()), code);
if (code == asio::error::eof)
{
NAV_WARN("Server has closed connection. Closing the TCPClient");
Close();
}
else if (code && code != asio::error::would_block)
{
NAV_ERROR("TCPClient::Read recieved unexpected error from host. Error message: {0}", code.message());
NAV_WARN("Closing the socket.");
Close();
}
return std::vector<char>(m_readBuffer.begin(), m_readBuffer.begin()+length);
}
//untested, not currently used.
size_t TCPClient::Write(const std::vector<char>& data)
{
asio::error_code code;
m_writeBuffer = data;
size_t length = m_socket.write_some(asio::buffer(m_writeBuffer, m_writeBuffer.size()), code);
if (code == asio::error::eof)
{
NAV_WARN("Server has closed connection. Closing the TCPClient.");
Close();
}
else if(code && code != asio::error::would_block)
{
NAV_ERROR("TCPClient::Write recieved unexpected error from host. Error message: {0}", code.message());
NAV_WARN("Closing the socket");
Close();
}
return length;
}
}

View File

@ -0,0 +1,41 @@
/*
TCPClient.h
Navigator's TCP client rep using asio. Contains very basic ability to read and write from a tcp socket in a non-blocking, synchronous method.
See asio docs for more details and examples on how to use.
GWM -- April 2022
Note: the write functionality has not been verified. Should be fine, but test before using.
*/
#ifndef TCPCLIENT_H
#define TCPCLIENT_H
#include <asio.hpp>
namespace Navigator {
class TCPClient
{
public:
TCPClient();
TCPClient(const std::string& host, const std::string& port);
~TCPClient();
void Connect(const std::string& host, const std::string& port);
std::vector<char> Read();
size_t Write(const std::vector<char>& data);
inline void Close() { if(IsOpen()) m_socket.close(); }
inline bool IsOpen() { return m_socket.is_open(); }
private:
std::vector<char> m_readBuffer;
std::vector<char> m_writeBuffer;
static constexpr size_t s_readBufferSize = 24000; // reserve some space for the read buffer
asio::io_context m_context;
asio::ip::tcp::socket m_socket;
};
}
#endif

View File

@ -19,7 +19,7 @@ namespace Navigator {
/* TCPConnection */
TCPConnection::TCPConnection(asio::io_context& context) :
m_socket(context), m_buffer(24)
m_socket(context), m_buffer(36)
{
}
@ -27,15 +27,20 @@ namespace Navigator {
void TCPConnection::Start()
{
CreateBinaryBuffer(); //Generate Buffer
//CreateBinaryBufferFragmented(); //Generate fragmented buffer
//Actually write the buffer to the socket. Use std::bind to set a callback function for error handling or any other server side actions
asio::async_write(m_socket, asio::buffer(m_buffer),
asio::async_write(m_socket, asio::buffer(m_buffer, m_buffer.size()),
std::bind(&TCPConnection::HandleWrite, this, std::placeholders::_1, std::placeholders::_2));
}
//Server-side connection actions upon attempting write, only on for debugging
void TCPConnection::HandleWrite(const asio::error_code& ec, size_t bytes)
{
/*
static int i = 0;
if(!ec)
NAV_INFO("Number written: {0}", ++i);
*/
//NAV_INFO("Writer result: Asio Error -- {0} Amount transferred={1}", ec.message(), bytes);
}
@ -54,6 +59,7 @@ namespace Navigator {
m_hit.flags = 0;
m_hit.Ns = 0;
char* data_pointer;
int buffer_position = 0;
data_pointer = (char*)&m_hit.board;
@ -98,7 +104,167 @@ namespace Navigator {
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
}
/*
Create C-style binary buffer from the data struct. This is to mimic the raw data source,
which will have no padding, or any other normal struct related features (and note that the intrisic
ordering from compass ensures that padding would be placed). Here we also fragment a hit (split over 2 buffers)
to test ability to handle real life conditions
Note -- as implemented highlights one issue with CAEN CoMPASS sources. No headers are left in the stream,
so one must be CERTAIN to attach Navigator to the socket before running, otherwise fragments could lead to
scrambled unpacking order. (i.e. sometimes this will work for the test and sometimes it won't)
*/
void TCPConnection::CreateBinaryBufferFragmented()
{
static std::atomic<bool> even = true;
m_hit.board = 8;
m_hit.channel = 1;
m_hit.timestamp = m_hit.timestamp + s_timestep;
m_hit.lgate = 512;
m_hit.sgate = 0;
m_hit.flags = 0;
m_hit.Ns = 0;
char* data_pointer;
int buffer_position = 0;
if (even)
{
data_pointer = (char*)&m_hit.board;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.channel;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.timestamp;
for (int i = 0; i < 8; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.lgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.sgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.flags;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.Ns;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.board;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.channel;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
m_hit.timestamp += s_timestep;
data_pointer = (char*)&m_hit.timestamp;
for (int i = 0; i < 8; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
even = false;
}
else
{
data_pointer = (char*)&m_hit.lgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.sgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.flags;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.Ns;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.board;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.channel;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.timestamp;
for (int i = 0; i < 8; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.lgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.sgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.flags;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.Ns;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
even = true;
}
}
/* TCPServer */

View File

@ -34,6 +34,7 @@ namespace Navigator {
private:
void HandleWrite(const asio::error_code& ec, size_t bytes);
void CreateBinaryBuffer();
void CreateBinaryBufferFragmented();
asio::ip::tcp::socket m_socket;
std::vector<char> m_buffer;