1
0
Fork 0
mirror of https://github.com/gwm17/Specter.git synced 2024-11-22 18:28:52 -05:00

Added in a test server for debugging online connections. Fixed numerous errors in CompassOnlineSource and PhysicsEventBuilder. Now passes simple test, should be ready for full connection to CoMPASS. Also added Timer for profiling (eventually).

This commit is contained in:
Gordon McCann 2022-04-11 09:41:24 -04:00
parent bed9b0fc1e
commit 8ea527c69f
8 changed files with 316 additions and 3 deletions

View File

@ -16,6 +16,7 @@ public:
Navigator::Application()
{
PushLayer(new Navigator::SPSInputLayer());
//PushLayer(new Navigator::TestServerLayer());
PushAnalysisStage(new Navigator::SPSAnalysisStage());
}
};

View File

@ -32,5 +32,6 @@
#include "Navigator/Core/SpectrumManager.h"
#include "Navigator/Core/Layer.h"
#include "Navigator/Events/Event.h"
#include "Navigator/Utils/TestServerLayer.h"
#endif

View File

@ -1,7 +1,9 @@
/*
CompassOnlineSource.cpp
A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs).
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we
need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting
for data to come over the pipe. We handle the case of an un-filled buffer internally.
IMPORTANT
Navigator wants a unqiue ID on each hit. To do this we use the idiom:
@ -35,6 +37,7 @@ namespace Navigator {
asio::ip::tcp::resolver resolver(m_socketContext);
auto end_points = resolver.resolve(hostname, port);
asio::connect(m_socket, end_points);
m_socket.non_blocking(true); //Set the connection as non-blocking
}
catch (asio::system_error& error)
{
@ -87,6 +90,10 @@ namespace Navigator {
NAV_WARN("CompassOnlineSource invalidated by host. Invalidating and detaching source.");
m_validFlag = false;
}
else if (code == asio::error::would_block) //Ignore cases where the socket would have blocked
{
return;
}
else if (code)
{
NAV_ERROR("CompassOnlineSource recieved unexpected error from host. Error message: {0}", code.message());

View File

@ -1,7 +1,9 @@
/*
CompassOnlineSource.h
A data source for online CAEN CoMPASS Data. Creates a tcp socket, connects to the remote source, and pulls data to a buffer. Data is then converted
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs).
from the CAEN CoMPASS format to the native NavData format. Uses asio as the networking library (see asio docs). Note that here we use syncrhonous since we
need to know if the buffer is/was filled, however we use non-blocking since we don't want the entire process to hang on attempting a connection or waiting
for data to come over the pipe. We handle the case of an un-filled buffer internally.
IMPORTANT
Navigator wants a unqiue ID on each hit. To do this we use the idiom:
@ -34,7 +36,7 @@ namespace Navigator {
void FillBuffer();
void GetHit();
std::vector<char> m_buffer;
static constexpr size_t m_bufferSize = 1000000; //Max amount of data we allow the source to buffer in. I don't think this should ever be maxed?
static constexpr size_t m_bufferSize = 24000; //Max amount of data we allow the source to buffer in. I don't think this should ever be maxed?
const int m_nchannels_per_board = 16; //IMPORTANT: Used for ID'ing channels uniquely. If you use boards with 32 or 8 or 64 channels you must change this! If you mix boards with
//different numbers of channels, you will have to find a different id solution.
char* m_bufferIter;

View File

@ -23,6 +23,9 @@ namespace Navigator {
bool PhysicsEventBuilder::AddDatumToEvent(const NavData& datum)
{
if (datum.timestamp == 0) //Ignore empty data (need a valid timestamp)
return false;
if (m_eventStartTime == 0) //first ever event
{
m_eventStartTime = datum.timestamp;

View File

@ -0,0 +1,176 @@
/*
TestServerLayer.h
This set of classes represents a test layer containing a TCP server and client connection class. This is only to be used as a way to debug
online data sources. You can have this server post data to a port on a localhost and then have the analysis thread connect to that port and analyze the
imitated data. Default setup is for an OnlineCompassSource, but of course this could be easily modified for your test of choice. For more details,
see the asio Tutorials, specifically the asynchronous daytime server.
Here we use async methods, as we do not want the whole project to be hanging on creating a succesfull client/server connection. (Also cause it was fun
to learn)
GWM -- April 2022
NOTE: There are NO security features on this server/connection. Please use with care on a protected network/firewalled machine.
*/
#include "TestServerLayer.h"
namespace Navigator {
/* TCPConnection */
TCPConnection::TCPConnection(asio::io_context& context) :
m_socket(context), m_buffer(24)
{
}
//This function is kinda misnamed in our scheme. Should be Write to make more clear.
void TCPConnection::Start()
{
CreateBinaryBuffer(); //Generate Buffer
//Actually write the buffer to the socket. Use std::bind to set a callback function for error handling or any other server side actions
asio::async_write(m_socket, asio::buffer(m_buffer),
std::bind(&TCPConnection::HandleWrite, this, std::placeholders::_1, std::placeholders::_2));
}
//Server-side connection actions upon attempting write, only on for debugging
void TCPConnection::HandleWrite(const asio::error_code& ec, size_t bytes)
{
//NAV_INFO("Writer result: Asio Error -- {0} Amount transferred={1}", ec.message(), bytes);
}
/*
Create C-style binary buffer from the data struct. This is to mimic the raw data source,
which will have no padding, or any other normal struct related features (and note that the intrisic
ordering from compass ensures that padding would be placed)
*/
void TCPConnection::CreateBinaryBuffer()
{
m_hit.board = 8;
m_hit.channel = 1;
m_hit.timestamp = m_hit.timestamp + s_timestep;
m_hit.lgate = 512;
m_hit.sgate = 0;
m_hit.flags = 0;
m_hit.Ns = 0;
char* data_pointer;
int buffer_position = 0;
data_pointer = (char*)&m_hit.board;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.channel;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.timestamp;
for (int i = 0; i < 8; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.lgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.sgate;
for (int i = 0; i < 2; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.flags;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
data_pointer = (char*)&m_hit.Ns;
for (int i = 0; i < 4; i++)
{
m_buffer[buffer_position] = *(data_pointer + i);
buffer_position++;
}
}
/* TCPServer */
TCPServer::TCPServer(asio::io_context& context) :
m_contextRef(context), m_acceptor(context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 51489)), m_isAccepted(false)
{
}
//Attempt to create a connection
std::shared_ptr<TCPConnection> TCPServer::StartAccept()
{
std::shared_ptr<TCPConnection> new_connection = TCPConnection::CreateConnection(m_contextRef);
m_acceptor.async_accept(new_connection->Socket(), std::bind(&TCPServer::HandleAccept, this, new_connection, std::placeholders::_1));
return new_connection;
}
//If connection successful, attempt to write as a test.
void TCPServer::HandleAccept(std::shared_ptr<TCPConnection> connection, const asio::error_code& ec)
{
if (!ec)
{
m_isAccepted = true;
connection->Start();
}
else
{
m_isAccepted = false;
NAV_INFO("TCPServer HandleAccept found Error: {0}", ec.message());
}
StartAccept();
}
/* TestServerLayer */
TestServerLayer::TestServerLayer() :
Layer("TestServer"), m_server(nullptr), m_connection(nullptr)
{
}
TestServerLayer::~TestServerLayer()
{
if (m_server)
delete m_server;
}
//Create a server, get the connection, poll actions
void TestServerLayer::OnAttach()
{
try
{
m_server = new TCPServer(m_context);
m_connection = m_server->StartAccept();
m_context.poll();
}
catch (std::exception& e)
{
NAV_INFO("ServerLayer Error: {0}", e.what());
}
}
void TestServerLayer::OnDetach()
{
}
//Tell to write, then poll actions
void TestServerLayer::OnUpdate()
{
m_connection->Start();
m_context.poll();
}
}

View File

@ -0,0 +1,77 @@
/*
TestServerLayer.h
This set of classes represents a test layer containing a TCP server and client connection class. This is only to be used as a way to debug
online data sources. You can have this server post data to a port on a localhost and then have the analysis thread connect to that port and analyze the
imitated data. Default setup is for an OnlineCompassSource, but of course this could be easily modified for your test of choice. For more details,
see the asio Tutorials, specifically the asynchronous daytime server.
Here we use async methods, as we do not want the whole project to be hanging on creating a succesfull client/server connection. (Also cause it was fun
to learn)
GWM -- April 2022
NOTE: There are NO security features on this server/connection. Please use with care on a protected network/firewalled machine.
*/
#ifndef TEST_SERVER_LAYER_H
#define TEST_SERVER_LAYER_H
#include "Navigator/Core/Layer.h"
#include "Navigator/Physics/Caen/CompassHit.h"
#include "asio.hpp"
namespace Navigator {
/*Server-side TCP Connection to the open port*/
class TCPConnection
{
public:
TCPConnection(asio::io_context& context);
inline static std::shared_ptr<TCPConnection> CreateConnection(asio::io_context& context) { return std::make_shared<TCPConnection>(context); }
inline asio::ip::tcp::socket& Socket() { return m_socket; }
void Start();
private:
void HandleWrite(const asio::error_code& ec, size_t bytes);
void CreateBinaryBuffer();
asio::ip::tcp::socket m_socket;
std::vector<char> m_buffer;
CompassHit m_hit;
static constexpr uint64_t s_timestep = 2000000;
};
/*Server itself*/
class TCPServer
{
public:
TCPServer(asio::io_context& context);
inline bool IsAccepted() { return m_isAccepted; }
std::shared_ptr<TCPConnection> StartAccept();
private:
void HandleAccept(std::shared_ptr<TCPConnection> connection, const asio::error_code& error);
asio::io_context& m_contextRef;
asio::ip::tcp::acceptor m_acceptor;
bool m_isAccepted;
};
class TestServerLayer : public Layer
{
public:
TestServerLayer();
virtual ~TestServerLayer();
virtual void OnAttach() override;
virtual void OnDetach() override;
virtual void OnUpdate() override;
private:
asio::io_context m_context;
TCPServer* m_server;
std::shared_ptr<TCPConnection> m_connection;
};
}
#endif

View File

@ -0,0 +1,46 @@
#ifndef TIMER_H
#define TIMER_H
#include <chrono>
namespace Navigator {
class Timer
{
public:
Timer(const char* name) :
m_name(name), m_stopped(false)
{
m_startTime = Clock::now();
}
~Timer()
{
if (!m_stopped)
Stop();
}
void Stop()
{
auto stopTime = Clock::now();
int64_t start = std::chrono::time_point_cast<std::chrono::microseconds>(m_startTime).time_since_epoch().count();
int64_t stop = std::chrono::time_point_cast<std::chrono::microseconds>(stopTime).time_since_epoch().count();
float duration = (stop - start)*0.001;
m_stopped = true;
NAV_INFO("{1} -- Duration: {0} ms", m_name, duration);
}
private:
using Time = std::chrono::steady_clock::time_point;
using Clock = std::chrono::steady_clock;
const char* m_name;
Time m_startTime;
bool m_stopped;
};
}
#endif