diff --git a/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.cpp b/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.cpp index 2f13545..187d3f8 100644 --- a/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.cpp +++ b/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.cpp @@ -1,7 +1,7 @@ /* CompassOnlineSource.cpp - A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted - from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we + A data source for online CAEN CoMPASS Data. Uses TCPClient to connect to the CoMPASS server. Data is then converted + from the CAEN CoMPASS format to the native NavData format. Note that here we use syncrhonous since we need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting for data to come over the pipe. We handle the case of an un-filled buffer internally. @@ -12,57 +12,52 @@ digitizers have the SAME number of channels. By default CompassRun assumes 16 channels per board, as this is what is used with the SE-SPS setup at FoxLab. If you use a different set of boards, CHANGE THIS VALUE! If you use mixed boards, you will need to invent a new id scheme altogether. - GWM -- Feb 2022 + ADDITIONALLY + CoMPASS servers provide no stream side information on the state of a transfer (verified via communication w/ CAEN). That is: there are no headers or enders on the data transfers. + This forces us to use the size of a single CoMPASS datum (CompassHit) to determine the state of a transfer. If the read buffer size is not a whole multiple of CompassHits, the data + was determined to be fragmented. This has a huge drawback: in general there is no guarantee that the first byte of the transferred data is the first byte of a CompassHit. This means that + Navigator MUST be connected to the CoMPASS server BEFORE starting the aquisition. Otherwise the program could endup in a state of scrambled unpacking (also verified w/ CAEN). + Maybe we can get them to change this? Headers reeaaally should exist for transfers like this. + + GWM -- April 2022 */ #include "CompassOnlineSource.h" namespace Navigator { CompassOnlineSource::CompassOnlineSource(const std::string& hostname, const std::string& port) : - DataSource(), m_bufferIter(nullptr), m_bufferEnd(nullptr), m_socket(m_socketContext) + DataSource(), m_bufferIter(nullptr), m_bufferEnd(nullptr) { - m_buffer.resize(m_bufferSize); - InitSocket(hostname, port); + InitConnection(hostname, port); } CompassOnlineSource::~CompassOnlineSource() {} - void CompassOnlineSource::InitSocket(const std::string& hostname, const std::string& port) + void CompassOnlineSource::InitConnection(const std::string& hostname, const std::string& port) { m_validFlag = false; - //asio tends to work in terms of exceptions, which we avoid in the rest of this project. - //If the connection is unsuccessful, an exception is thrown, which we parse and print to the terminal. - try + m_connection.Connect(hostname, port); + if (m_connection.IsOpen()) { - asio::ip::tcp::resolver resolver(m_socketContext); - auto end_points = resolver.resolve(hostname, port); - asio::connect(m_socket, end_points); - m_socket.non_blocking(true); //Set the connection as non-blocking + m_validFlag = true; } - catch (asio::system_error& error) - { - NAV_ERROR("Unable to connect to remote port for CompassOnlineSource! Error Code: {0}", error.what()); - return; - } - - NAV_INFO("Successfully connected to host {0} on port {1}", hostname, port); - m_validFlag = true; } const NavData& CompassOnlineSource::GetData() { + size_t range = m_bufferEnd - m_bufferIter; //how much buffer we have left if (!IsValid()) { NAV_ERROR("Attempting to access invalid source at CompassOnlineSource!"); m_datum = NavData(); return m_datum; } - else if (m_bufferIter == nullptr || m_bufferIter == m_bufferEnd) + else if (m_bufferIter == nullptr || range < m_datasize || m_bufferIter == m_bufferEnd) //If no buffer/buffer completely used/buffer fragmented fill { FillBuffer(); } - if (m_bufferIter != m_bufferEnd) + if (m_bufferIter != m_bufferEnd && range >= m_datasize)//If buffer and enough data for a hit, get it GetHit(); else { @@ -80,26 +75,21 @@ namespace Navigator { void CompassOnlineSource::FillBuffer() { - asio::error_code code; + std::vector recieved = m_connection.Read(); + //If we didn't finish the last buffer toss all of the stuff we used and then append the recieved data + //Otherwise, copy over the recieved buffer. Note lack of vector::resize, vector::reserve. Intentional for performance. + //The amount of copying/resizing is best handled by the std (according to multiple references) + if (m_bufferIter != m_bufferEnd) + { + size_t pos = m_bufferEnd - m_bufferIter; + m_currentBuffer.erase(m_currentBuffer.begin(), m_currentBuffer.begin() + (m_currentBuffer.size() - pos)); //remove used bytes + m_currentBuffer.insert(m_currentBuffer.end(), recieved.begin(), recieved.end()); + } + else + m_currentBuffer = recieved; - size_t length = m_socket.read_some(asio::buffer(m_buffer), code); - m_bufferEnd = m_buffer.data() + length; - m_bufferIter = m_buffer.data(); - if (code == asio::error::eof) - { - NAV_WARN("CompassOnlineSource invalidated by host. Invalidating and detaching source."); - m_validFlag = false; - } - else if (code == asio::error::would_block) //Ignore cases where the socket would have blocked - { - return; - } - else if (code) - { - NAV_ERROR("CompassOnlineSource recieved unexpected error from host. Error message: {0}", code.message()); - NAV_WARN("Invalidating and detaching source."); - m_validFlag = false; - } + m_bufferIter = m_currentBuffer.data(); + m_bufferEnd = m_currentBuffer.data() + m_currentBuffer.size(); } void CompassOnlineSource::GetHit() diff --git a/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.h b/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.h index a84d847..73a7745 100644 --- a/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.h +++ b/Navigator/src/Navigator/Physics/Caen/CompassOnlineSource.h @@ -1,7 +1,7 @@ /* CompassOnlineSource.h - A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted - from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we + A data source for online CAEN CoMPASS Data. Uses TCPClient to connect to the CoMPASS server. Data is then converted + from the CAEN CoMPASS format to the native NavData format. Note that here we use syncrhonous since we need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting for data to come over the pipe. We handle the case of an un-filled buffer internally. @@ -12,14 +12,21 @@ digitizers have the SAME number of channels. By default CompassRun assumes 16 channels per board, as this is what is used with the SE-SPS setup at FoxLab. If you use a different set of boards, CHANGE THIS VALUE! If you use mixed boards, you will need to invent a new id scheme altogether. - GWM -- Feb 2022 + ADDITIONALLY + CoMPASS servers provide no stream side information on the state of a transfer (verified via communication w/ CAEN). That is: there are no headers or enders on the data transfers. + This forces us to use the size of a single CoMPASS datum (CompassHit) to determine the state of a transfer. If the read buffer size is not a whole multiple of CompassHits, the data + was determined to be fragmented. This has a huge drawback: in general there is no guarantee that the first byte of the transferred data is the first byte of a CompassHit. This means that + Navigator MUST be connected to the CoMPASS server BEFORE starting the aquisition. Otherwise the program could endup in a state of scrambled unpacking (also verified w/ CAEN). + Maybe we can get them to change this? Headers reeaaally should exist for transfers like this. + + GWM -- April 2022 */ #ifndef COMPASS_ONLINE_SOURCE_H #define COMPASS_ONLINE_SOURCE_H #include "Navigator/Physics/DataSource.h" +#include "Navigator/Utils/TCPClient.h" #include "CompassHit.h" -#include "asio.hpp" namespace Navigator { @@ -32,19 +39,20 @@ namespace Navigator { virtual const NavData& GetData() override; private: - void InitSocket(const std::string& hostname, const std::string& port); + void InitConnection(const std::string& hostname, const std::string& port); void FillBuffer(); void GetHit(); - std::vector m_buffer; - static constexpr size_t m_bufferSize = 24000; //Max amount of data we allow the source to buffer in. I don't think this should ever be maxed? + + std::vector m_currentBuffer; + std::vector m_fragment; + static constexpr int m_datasize = 24; //size of CoMPASS hit in bytes, change as needed (if for example you have calibrated energies) const int m_nchannels_per_board = 16; //IMPORTANT: Used for ID'ing channels uniquely. If you use boards with 32 or 8 or 64 channels you must change this! If you mix boards with //different numbers of channels, you will have to find a different id solution. char* m_bufferIter; char* m_bufferEnd; CompassHit m_currentHit; - asio::io_context m_socketContext; - asio::ip::tcp::socket m_socket; + TCPClient m_connection; }; diff --git a/Navigator/src/Navigator/Utils/TCPClient.cpp b/Navigator/src/Navigator/Utils/TCPClient.cpp new file mode 100644 index 0000000..c002f63 --- /dev/null +++ b/Navigator/src/Navigator/Utils/TCPClient.cpp @@ -0,0 +1,85 @@ +/* + TCPClient.h + Navigator's TCP client rep using asio. Contains very basic ability to read and write from a tcp socket in a non-blocking, synchronous method. + See asio docs for more details and examples on how to use. + + GWM -- April 2022 + + Note: the write functionality has not been verified. Should be fine, but test before using. +*/ +#include "TCPClient.h" + +namespace Navigator { + + TCPClient::TCPClient() : + m_socket(m_context) + { + m_readBuffer.resize(s_readBufferSize); + } + + TCPClient::TCPClient(const std::string& host, const std::string& port) : + m_socket(m_context) + { + m_readBuffer.resize(s_readBufferSize); + Connect(host, port); + } + + TCPClient::~TCPClient() + { + Close(); + } + + void TCPClient::Connect(const std::string& host, const std::string& port) + { + try + { + asio::ip::tcp::resolver resolver(m_context); + auto end_points = resolver.resolve(host, port); + asio::connect(m_socket, end_points); + m_socket.non_blocking(true); //Set the connection as non-blocking + } + catch (asio::system_error& e) + { + NAV_ERROR("Unable to connect to remote port for TCPClient! Error Code: {0}", e.what()); + } + } + + std::vector TCPClient::Read() + { + asio::error_code code; + size_t length = m_socket.read_some(asio::buffer(m_readBuffer, m_readBuffer.size()), code); + if (code == asio::error::eof) + { + NAV_WARN("Server has closed connection. Closing the TCPClient"); + Close(); + } + else if (code && code != asio::error::would_block) + { + NAV_ERROR("TCPClient::Read recieved unexpected error from host. Error message: {0}", code.message()); + NAV_WARN("Closing the socket."); + Close(); + } + return std::vector(m_readBuffer.begin(), m_readBuffer.begin()+length); + } + + //untested, not currently used. + size_t TCPClient::Write(const std::vector& data) + { + asio::error_code code; + m_writeBuffer = data; + size_t length = m_socket.write_some(asio::buffer(m_writeBuffer, m_writeBuffer.size()), code); + if (code == asio::error::eof) + { + NAV_WARN("Server has closed connection. Closing the TCPClient."); + Close(); + } + else if(code && code != asio::error::would_block) + { + NAV_ERROR("TCPClient::Write recieved unexpected error from host. Error message: {0}", code.message()); + NAV_WARN("Closing the socket"); + Close(); + } + + return length; + } +} \ No newline at end of file diff --git a/Navigator/src/Navigator/Utils/TCPClient.h b/Navigator/src/Navigator/Utils/TCPClient.h new file mode 100644 index 0000000..d5fbdc7 --- /dev/null +++ b/Navigator/src/Navigator/Utils/TCPClient.h @@ -0,0 +1,41 @@ +/* + TCPClient.h + Navigator's TCP client rep using asio. Contains very basic ability to read and write from a tcp socket in a non-blocking, synchronous method. + See asio docs for more details and examples on how to use. + + GWM -- April 2022 + + Note: the write functionality has not been verified. Should be fine, but test before using. +*/ +#ifndef TCPCLIENT_H +#define TCPCLIENT_H + +#include + +namespace Navigator { + + class TCPClient + { + public: + TCPClient(); + TCPClient(const std::string& host, const std::string& port); + ~TCPClient(); + + void Connect(const std::string& host, const std::string& port); + std::vector Read(); + size_t Write(const std::vector& data); + inline void Close() { if(IsOpen()) m_socket.close(); } + inline bool IsOpen() { return m_socket.is_open(); } + + private: + + std::vector m_readBuffer; + std::vector m_writeBuffer; + + static constexpr size_t s_readBufferSize = 24000; // reserve some space for the read buffer + asio::io_context m_context; + asio::ip::tcp::socket m_socket; + }; +} + +#endif \ No newline at end of file diff --git a/Navigator/src/Navigator/Utils/TestServerLayer.cpp b/Navigator/src/Navigator/Utils/TestServerLayer.cpp index a0bb446..25d3798 100644 --- a/Navigator/src/Navigator/Utils/TestServerLayer.cpp +++ b/Navigator/src/Navigator/Utils/TestServerLayer.cpp @@ -19,7 +19,7 @@ namespace Navigator { /* TCPConnection */ TCPConnection::TCPConnection(asio::io_context& context) : - m_socket(context), m_buffer(24) + m_socket(context), m_buffer(36) { } @@ -27,15 +27,20 @@ namespace Navigator { void TCPConnection::Start() { CreateBinaryBuffer(); //Generate Buffer - + //CreateBinaryBufferFragmented(); //Generate fragmented buffer //Actually write the buffer to the socket. Use std::bind to set a callback function for error handling or any other server side actions - asio::async_write(m_socket, asio::buffer(m_buffer), + asio::async_write(m_socket, asio::buffer(m_buffer, m_buffer.size()), std::bind(&TCPConnection::HandleWrite, this, std::placeholders::_1, std::placeholders::_2)); } //Server-side connection actions upon attempting write, only on for debugging void TCPConnection::HandleWrite(const asio::error_code& ec, size_t bytes) { + /* + static int i = 0; + if(!ec) + NAV_INFO("Number written: {0}", ++i); + */ //NAV_INFO("Writer result: Asio Error -- {0} Amount transferred={1}", ec.message(), bytes); } @@ -54,6 +59,7 @@ namespace Navigator { m_hit.flags = 0; m_hit.Ns = 0; + char* data_pointer; int buffer_position = 0; data_pointer = (char*)&m_hit.board; @@ -98,7 +104,167 @@ namespace Navigator { m_buffer[buffer_position] = *(data_pointer + i); buffer_position++; } + } + /* + Create C-style binary buffer from the data struct. This is to mimic the raw data source, + which will have no padding, or any other normal struct related features (and note that the intrisic + ordering from compass ensures that padding would be placed). Here we also fragment a hit (split over 2 buffers) + to test ability to handle real life conditions + + Note -- as implemented highlights one issue with CAEN CoMPASS sources. No headers are left in the stream, + so one must be CERTAIN to attach Navigator to the socket before running, otherwise fragments could lead to + scrambled unpacking order. (i.e. sometimes this will work for the test and sometimes it won't) + */ + void TCPConnection::CreateBinaryBufferFragmented() + { + static std::atomic even = true; + m_hit.board = 8; + m_hit.channel = 1; + m_hit.timestamp = m_hit.timestamp + s_timestep; + m_hit.lgate = 512; + m_hit.sgate = 0; + m_hit.flags = 0; + m_hit.Ns = 0; + + + char* data_pointer; + int buffer_position = 0; + if (even) + { + data_pointer = (char*)&m_hit.board; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.channel; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.timestamp; + for (int i = 0; i < 8; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.lgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.sgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.flags; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.Ns; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.board; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.channel; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + m_hit.timestamp += s_timestep; + data_pointer = (char*)&m_hit.timestamp; + for (int i = 0; i < 8; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + even = false; + } + else + { + data_pointer = (char*)&m_hit.lgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.sgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.flags; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.Ns; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.board; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.channel; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.timestamp; + for (int i = 0; i < 8; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.lgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.sgate; + for (int i = 0; i < 2; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.flags; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + data_pointer = (char*)&m_hit.Ns; + for (int i = 0; i < 4; i++) + { + m_buffer[buffer_position] = *(data_pointer + i); + buffer_position++; + } + even = true; + } } /* TCPServer */ diff --git a/Navigator/src/Navigator/Utils/TestServerLayer.h b/Navigator/src/Navigator/Utils/TestServerLayer.h index 1ab7c8a..00867ad 100644 --- a/Navigator/src/Navigator/Utils/TestServerLayer.h +++ b/Navigator/src/Navigator/Utils/TestServerLayer.h @@ -34,6 +34,7 @@ namespace Navigator { private: void HandleWrite(const asio::error_code& ec, size_t bytes); void CreateBinaryBuffer(); + void CreateBinaryBufferFragmented(); asio::ip::tcp::socket m_socket; std::vector m_buffer;