1
0
Fork 0
mirror of https://github.com/gwm17/Specter.git synced 2024-11-22 10:18:50 -05:00

Add Ritual data source. Cleanup DataSource, PhysicsEventBuilder, child classes.

This commit is contained in:
Gordon McCann 2023-03-30 20:43:02 -04:00
parent 6c5b7d17a7
commit 88173a16cf
20 changed files with 366 additions and 48 deletions

View File

@ -125,6 +125,10 @@ target_sources(Specter PRIVATE
Specter/Utils/RandomGenerator.h
Specter/Utils/ThreadSafeQueue.h
Specter/Core/EntryPoint.h
Specter/Physics/ritual/RitualOnlineSource.h
Specter/Physics/ritual/RitualOnlineSource.cpp
Specter/Physics/ritual/RitualClient.cpp
Specter/Physics/ritual/RitualClient.h
)
#ImPlot sources

View File

@ -31,7 +31,7 @@ namespace Specter {
SPEC_PROFILE_FUNCTION();
static bool result = false;
static std::vector<DataSource::SourceType> availTypes = { DataSource::SourceType::CompassOnline, DataSource::SourceType::CompassOffline, DataSource::SourceType::DaqromancyOnline,
DataSource::SourceType::DaqromancyOffline, DataSource::SourceType::CharonOnline };
DataSource::SourceType::DaqromancyOffline, DataSource::SourceType::CharonOnline, DataSource::SourceType::RitualOnline };
result = false;
if (m_openFlag)
{
@ -114,6 +114,12 @@ namespace Specter {
ImGui::InputText("Hostname", &m_args.location);
ImGui::InputText("Port", &m_args.port);
}
else if (m_args.type == DataSource::SourceType::RitualOnline)
{
ImGui::InputText("Hostname", &m_args.location);
ImGui::InputText("Port", &m_args.port);
ImGui::InputScalar("Coinc. Window (ps)", ImGuiDataType_U64, &m_args.coincidenceWindow);
}
if (ImGui::Button("Ok"))
{

View File

@ -80,10 +80,7 @@ namespace Specter {
m_datum.timestamp = m_currentHit.timestamp;
m_datum.id = Utilities::GetBoardChannelUUID(m_currentHit.board, m_currentHit.channel);
if(m_eventBuilder.AddDatum(m_datum))
{
m_isEventReady = true;
}
m_eventBuilder.AddDatum(m_datum);
}
void CompassOnlineSource::FillBuffer()

View File

@ -40,11 +40,13 @@ namespace Specter {
virtual ~CompassOnlineSource() override;
virtual void ProcessData() override;
virtual const std::vector<SpecEvent>& GetEvents() override
virtual std::vector<SpecEvent> GetEvents() override
{
m_isEventReady = false;
return m_eventBuilder.GetReadyEvents();
auto temp = m_eventBuilder.GetReadyEvents();
m_eventBuilder.ClearReadyEvents();
return temp;
}
virtual const bool IsEventReady() const override { return m_eventBuilder.IsEventReady(); }
private:
void InitConnection(const std::string& hostname, const std::string& port);

View File

@ -146,10 +146,7 @@ namespace Specter {
m_datum.timestamp = m_hit.timestamp;
m_datum.id = Utilities::GetBoardChannelUUID(m_hit.board, m_hit.channel);
if(m_eventBuilder.AddDatum(m_datum))
{
m_isEventReady = true;
}
m_eventBuilder.AddDatum(m_datum);
}
}

View File

@ -37,14 +37,16 @@ namespace Specter {
CompassRun(const std::string& dir, uint64_t coincidenceWindow);
virtual ~CompassRun();
virtual void ProcessData() override;
virtual const std::vector<SpecEvent>& GetEvents() override
virtual std::vector<SpecEvent> GetEvents() override
{
m_isEventReady = false;
return m_eventBuilder.GetReadyEvents();
auto temp = m_eventBuilder.GetReadyEvents();
m_eventBuilder.ClearReadyEvents();
return temp;
}
inline void SetDirectory(const std::string& dir) { m_directory = dir; CollectFiles(); }
inline void SetShiftMap(const std::string& filename) { m_smap.SetFile(filename); }
void SetDirectory(const std::string& dir) { m_directory = dir; CollectFiles(); }
void SetShiftMap(const std::string& filename) { m_smap.SetFile(filename); }
virtual const bool IsEventReady() const override { return m_eventBuilder.IsEventReady(); }
private:
void CollectFiles();

View File

@ -105,9 +105,6 @@ namespace Specter {
m_datum.shortEnergy = m_dyHit.energyShort;
m_datum.timestamp = m_dyHit.timestamp;
m_datum.id = Utilities::GetBoardChannelUUID(m_dyHit.board, m_dyHit.channel);
if(m_eventBuilder.AddDatum(m_datum))
{
m_isEventReady = true;
}
m_eventBuilder.AddDatum(m_datum);
}
}

View File

@ -14,12 +14,15 @@ namespace Specter {
virtual ~DYFileSource();
virtual void ProcessData() override;
virtual const std::vector<SpecEvent>& GetEvents() override
virtual std::vector<SpecEvent> GetEvents() override
{
m_isEventReady = false;
return m_eventBuilder.GetReadyEvents();
auto temp = m_eventBuilder.GetReadyEvents();
m_eventBuilder.ClearReadyEvents();
return temp;
}
virtual const bool IsEventReady() const override { return m_eventBuilder.IsEventReady(); }
private:
void CollectFiles();
bool GetNextHit();

View File

@ -28,8 +28,7 @@ namespace Specter {
m_datum.shortEnergy = m_dyHit.energyShort;
m_datum.timestamp = m_dyHit.timestamp;
m_datum.id = Utilities::GetBoardChannelUUID(m_dyHit.board, m_dyHit.channel);
if(m_eventBuilder.AddDatum(m_datum))
m_isEventReady = true;
m_eventBuilder.AddDatum(m_datum);
}
}
}

View File

@ -13,12 +13,15 @@ namespace Specter {
virtual ~DYOnlineSource();
virtual void ProcessData() override;
virtual const std::vector<SpecEvent>& GetEvents() override
virtual std::vector<SpecEvent> GetEvents() override
{
m_isEventReady = false;
return m_eventBuilder.GetReadyEvents();
auto temp = m_eventBuilder.GetReadyEvents();
m_eventBuilder.ClearReadyEvents();
return temp;
}
virtual const bool IsEventReady() const override { return m_eventBuilder.IsEventReady(); }
private:
DaqGrimoire::DYClient m_clientConnection;
DaqGrimoire::DYListData m_dyHit;

View File

@ -12,6 +12,7 @@
#include "Daqromancy/DYFileSource.h"
#include "Daqromancy/DYOnlineSource.h"
#include "nscldaq/CharonOnlineSource.h"
#include "ritual/RitualOnlineSource.h"
namespace Specter {
@ -25,6 +26,7 @@ namespace Specter {
case DataSource::SourceType::DaqromancyOffline: return new DYFileSource(args.location, args.coincidenceWindow);
case DataSource::SourceType::DaqromancyOnline: return new DYOnlineSource(args.location, args.port, args.coincidenceWindow);
case DataSource::SourceType::CharonOnline: return new CharonOnlineSource(args.location, args.port);
case DataSource::SourceType::RitualOnline: return new RitualOnlineSource(args.location, args.port, args.coincidenceWindow);
case DataSource::SourceType::None: return nullptr;
}
SPEC_WARN("Invalid DataSourceType at CreateDataSource!");
@ -41,6 +43,7 @@ namespace Specter {
case DataSource::SourceType::DaqromancyOffline: return "DaqromancyOffline";
case DataSource::SourceType::DaqromancyOnline: return "DaqromancyOnline";
case DataSource::SourceType::CharonOnline: return "CharonOnline";
case DataSource::SourceType::RitualOnline: return "RitualOnline";
}
return "None";

View File

@ -25,23 +25,23 @@ namespace Specter {
CompassOffline,
DaqromancyOnline,
DaqromancyOffline,
CharonOnline
CharonOnline,
RitualOnline
};
DataSource(uint64_t coincidenceWindow = 0) :
m_validFlag(false), m_isEventReady(false), m_eventBuilder(coincidenceWindow)
m_validFlag(false), m_eventBuilder(coincidenceWindow)
{
}
virtual ~DataSource() {};
virtual void ProcessData() = 0;
virtual const std::vector<SpecEvent>& GetEvents() = 0;
inline bool IsValid() { return m_validFlag; }
inline bool IsEventReady() { return m_isEventReady; }
virtual std::vector<SpecEvent> GetEvents() = 0;
virtual const bool IsEventReady() const = 0;
bool IsValid() { return m_validFlag; }
protected:
bool m_validFlag;
bool m_isEventReady;
SpecData m_datum;
PhysicsEventBuilder m_eventBuilder;
};

View File

@ -26,16 +26,16 @@ namespace Specter {
{
}
bool PhysicsEventBuilder::AddDatum(const SpecData& datum)
void PhysicsEventBuilder::AddDatum(const SpecData& datum)
{
SPEC_PROFILE_FUNCTION();
if (datum.timestamp == 0) //Ignore empty data (need a valid timestamp)
return false;
return;
m_dataBuffer[m_bufferIndex] = datum;
m_bufferIndex++;
if (m_bufferIndex < s_maxDataBuffer) //If we haven't filled the buffer keep going
return false;
return;
else if (m_sortFlag)
std::sort(m_dataBuffer.begin(), m_dataBuffer.end(), [](SpecData& i, SpecData& j) { return i.timestamp < j.timestamp; });
@ -59,10 +59,9 @@ namespace Specter {
}
}
m_bufferIndex = 0;
return true;
}
const std::vector<SpecEvent>& PhysicsEventBuilder::GetReadyEvents() const
std::vector<SpecEvent> PhysicsEventBuilder::GetReadyEvents() const
{
return m_readyEvents;
}

View File

@ -21,15 +21,17 @@ namespace Specter {
PhysicsEventBuilder();
PhysicsEventBuilder(uint64_t windowSize);
~PhysicsEventBuilder();
inline void SetCoincidenceWindow(uint64_t windowSize) { m_coincWindow = windowSize; }
inline void SetSortFlag(bool flag) { m_sortFlag = flag; }
inline void ClearAll() // reset all internal structures
void SetCoincidenceWindow(uint64_t windowSize) { m_coincWindow = windowSize; }
void SetSortFlag(bool flag) { m_sortFlag = flag; }
void ClearAll() // reset all internal structures
{
m_bufferIndex = 0;
m_readyEvents.clear();
}
bool AddDatum(const SpecData& datum);
const std::vector<SpecEvent>& GetReadyEvents() const;
void ClearReadyEvents() { m_readyEvents.clear(); }
void AddDatum(const SpecData& datum);
bool IsEventReady() const { return !m_readyEvents.empty(); }
std::vector<SpecEvent> GetReadyEvents() const;
private:
bool m_sortFlag;

View File

@ -5,7 +5,7 @@
namespace Specter {
CharonOnlineSource::CharonOnlineSource(const std::string& hostname, const std::string& port) :
DataSource(0), m_client(hostname, port)
DataSource(0), m_isEventReady(false), m_client(hostname, port)
{
m_validFlag = m_client.IsConnected();
m_readyEvents.emplace_back();

View File

@ -14,16 +14,18 @@ namespace Specter {
virtual ~CharonOnlineSource();
virtual void ProcessData() override;
virtual const std::vector<SpecEvent>& GetEvents() override
virtual std::vector<SpecEvent> GetEvents() override
{
m_isEventReady = false;
return m_readyEvents;
}
virtual const bool IsEventReady() const override { return m_isEventReady; }
private:
void UnpackRawBuffer();
CharonClient m_client;
bool m_isEventReady;
std::vector<uint8_t> m_rawBuffer;
SpecEvent m_event;
std::vector<SpecEvent> m_readyEvents;

View File

@ -0,0 +1,143 @@
#include "RitualClient.h"
namespace Specter {
RitualClient::RitualClient(const std::string& hostname, const std::string& port) :
m_socket(m_context), m_deadline(m_context)
{
Connect(hostname, port);
}
RitualClient::~RitualClient()
{
Disconnect();
}
bool RitualClient::GetData(RitualMessage& reciever)
{
if (m_queue.IsEmpty())
return false;
reciever = m_queue.Front();
m_queue.PopFront();
return true;
}
void RitualClient::Connect(const std::string& hostname, const std::string& port)
{
try
{
asio::ip::tcp::resolver resolver(m_context);
auto end_points = resolver.resolve(hostname, port);
m_deadline.expires_after(std::chrono::seconds(30));
asio::async_connect(m_socket, end_points,
[this, hostname, port](std::error_code ec, asio::ip::tcp::endpoint endpoint)
{
if (!ec)
{
SPEC_INFO("Connected RitualClient to {0}:{1}", endpoint.address(), endpoint.port());
//Turn off our deadline timer
m_deadline.cancel();
ReadSize();
}
else
{
SPEC_WARN("Unable to connect to RitualClient {0}:{1}", hostname, port);
m_socket.close();
}
}
);
m_deadline.async_wait(std::bind(&RitualClient::HandleTimeout, this, std::placeholders::_1));
m_ioThread = std::thread([this]() { m_context.run(); });
}
catch (asio::system_error& e)
{
SPEC_WARN("Unable to connect RitualClient to {0}:{1}", hostname, port);
return;
}
}
void RitualClient::Disconnect()
{
if (IsConnected())
{
asio::post(m_context, [this]() { m_socket.close(); });
}
m_context.stop();
if (m_ioThread.joinable())
m_ioThread.join();
}
void RitualClient::ReadSize()
{
asio::async_read(m_socket, asio::buffer(&m_tempMessage.size, sizeof(m_tempMessage.size)),
[this](std::error_code ec, std::size_t size)
{
if (!ec)
{
m_tempMessage.body.resize(m_tempMessage.size - s_minimumMessageSize);
ReadHitSize();
}
}
);
}
void RitualClient::ReadHitSize()
{
asio::async_read(m_socket, asio::buffer(&m_tempMessage.hitSize, sizeof(m_tempMessage.hitSize)),
[this](std::error_code ec, std::size_t size)
{
if (!ec)
{
ReadDataType();
}
}
);
}
void RitualClient::ReadDataType()
{
asio::async_read(m_socket, asio::buffer(&m_tempMessage.dataType, sizeof(m_tempMessage.dataType)),
[this](std::error_code ec, std::size_t size)
{
if (!ec)
{
ReadBody();
}
}
);
}
void RitualClient::ReadBody()
{
asio::async_read(m_socket, asio::buffer(&m_tempMessage.body, m_tempMessage.body.size()),
[this](std::error_code ec, std::size_t size)
{
if (!ec && size > 0)
{
m_queue.PushBack(m_tempMessage);
}
ReadSize();
}
);
}
void RitualClient::HandleTimeout(const asio::error_code& ec)
{
//If we stop the timer, don't do anything
if (ec == asio::error::operation_aborted)
{
return;
}
//Check to make sure that deadline wasn't asychronously moved
if (m_deadline.expiry() <= asio::steady_timer::clock_type::now())
{
SPEC_WARN("RitualClient timedout at Connect!");
m_socket.close();
m_deadline.expires_at(asio::steady_timer::time_point::max());
}
}
}

View File

@ -0,0 +1,54 @@
#ifndef RITUAL_CLIENT_H
#define RITUAL_CLIENT_H
#include "Specter/Utils/ThreadSafeQueue.h"
#include "Specter/Physics/SpecData.h"
#include <asio.hpp>
#include <thread>
namespace Specter {
struct RitualMessage
{
uint64_t size; //Inclusive size of whole message
uint64_t hitSize; //Size of an individual CoMPASS data hit within the body
uint16_t dataType; //CAEN header (CAEx)
std::vector<uint8_t> body; //Data body
};
class RitualClient
{
public:
RitualClient(const std::string& hostname, const std::string& port);
~RitualClient();
bool GetData(RitualMessage& reciever);
void Connect(const std::string& hostname, const std::string& port);
void Disconnect();
const bool IsConnected() const { return m_socket.is_open(); }
static constexpr uint64_t MinimumMessageSize() { return s_minimumMessageSize; }
private:
void ReadSize();
void ReadHitSize();
void ReadDataType();
void ReadBody();
void HandleTimeout(const asio::error_code& ec);
asio::io_context m_context;
asio::ip::tcp::socket m_socket;
asio::steady_timer m_deadline;
std::thread m_ioThread;
RitualMessage m_tempMessage;
ThreadSafeQueue<RitualMessage> m_queue;
//All messages have a minimum size of size + hitSize + dataType
static constexpr uint64_t s_minimumMessageSize = 64 + 64 + 16;
};
}
#endif

View File

@ -0,0 +1,73 @@
#include "RitualOnlineSource.h"
#include "../Caen/CompassHit.h"
#include "Specter/Utils/Functions.h"
namespace Specter {
RitualOnlineSource::RitualOnlineSource(const std::string& hostname, const std::string& port, uint64_t coincidenceWindow) :
DataSource(coincidenceWindow), m_client(hostname, port)
{
m_eventBuilder.SetSortFlag(true);
m_validFlag = m_client.IsConnected();
}
RitualOnlineSource::~RitualOnlineSource()
{
}
void RitualOnlineSource::ProcessData()
{
if (!m_client.IsConnected())
m_validFlag = false;
if (m_client.GetData(m_recievedMessage))
{
ReadMessage();
}
}
void RitualOnlineSource::ReadMessage()
{
uint64_t nHits = m_recievedMessage.body.size() / m_recievedMessage.hitSize;
uint64_t hitsRead = 0;
uint8_t* bodyPointer = m_recievedMessage.body.data();
CompassHit currentHit;
SpecData convertedHit;
while (hitsRead < nHits)
{
currentHit.board = *((uint16_t*)bodyPointer);
bodyPointer += 2;
currentHit.channel = *((uint16_t*)bodyPointer);
bodyPointer += 2;
currentHit.timestamp = *((uint64_t*)bodyPointer);
bodyPointer += 8;
if (Compass_IsEnergy(m_recievedMessage.dataType))
{
currentHit.energy = *((uint16_t*)bodyPointer);
bodyPointer += 2;
}
if (Compass_IsEnergyCalibrated(m_recievedMessage.dataType))
{
currentHit.energyCalibrated = *((uint16_t*)bodyPointer);
bodyPointer += 8;
}
if (Compass_IsEnergyShort(m_recievedMessage.dataType))
{
currentHit.energyShort = *((uint16_t*)bodyPointer);
bodyPointer += 2;
}
currentHit.flags = *((uint32_t*)bodyPointer);
bodyPointer += 4;
hitsRead++;
convertedHit.id = Utilities::GetBoardChannelUUID(currentHit.board, currentHit.channel);
convertedHit.timestamp = currentHit.timestamp;
convertedHit.longEnergy = currentHit.energy;
convertedHit.shortEnergy = currentHit.energyShort;
convertedHit.calEnergy = currentHit.energyCalibrated;
m_eventBuilder.AddDatum(convertedHit);
}
}
}

View File

@ -0,0 +1,32 @@
#ifndef RITUAL_ONLINE_SOURCE_H
#define RITUAL_ONLINE_SOURCE_H
#include "Specter/Physics/DataSource.h"
#include "RitualClient.h"
namespace Specter {
class RitualOnlineSource : public DataSource
{
public:
RitualOnlineSource(const std::string& hostname, const std::string& port, uint64_t coincidenceWindow);
virtual ~RitualOnlineSource();
virtual void ProcessData() override;
virtual std::vector<SpecEvent> GetEvents() override
{
auto temp = m_eventBuilder.GetReadyEvents();
m_eventBuilder.ClearReadyEvents();
return temp;
}
virtual const bool IsEventReady() const override { return m_eventBuilder.IsEventReady(); }
private:
void ReadMessage();
RitualClient m_client;
RitualMessage m_recievedMessage;
};
}
#endif