From 6f647bf3708d524f495f42fee1a266b8aeda95ba Mon Sep 17 00:00:00 2001 From: Gordon McCann Date: Thu, 29 Sep 2022 20:36:44 -0400 Subject: [PATCH] Adding the code --- include/DYListData.h | 50 +++++++++++++ include/DaqGrimoire.h | 10 +++ include/FileIO/DYFileReader.h | 123 ++++++++++++++++++++++++++++++ include/NetIO/DYClient.h | 129 ++++++++++++++++++++++++++++++++ include/NetIO/DYMessage.h | 19 +++++ include/NetIO/ThreadSafeQueue.h | 128 +++++++++++++++++++++++++++++++ 6 files changed, 459 insertions(+) create mode 100644 include/DYListData.h create mode 100644 include/DaqGrimoire.h create mode 100644 include/FileIO/DYFileReader.h create mode 100644 include/NetIO/DYClient.h create mode 100644 include/NetIO/DYMessage.h create mode 100644 include/NetIO/ThreadSafeQueue.h diff --git a/include/DYListData.h b/include/DYListData.h new file mode 100644 index 0000000..6aec970 --- /dev/null +++ b/include/DYListData.h @@ -0,0 +1,50 @@ +#ifndef DY_LIST_DATA_H +#define DY_LIST_DATA_H + +#include +#include + +namespace DaqGrimoire { + + namespace Data + { + static constexpr std::size_t Size = 24; + } + + struct DYListData + { + uint16_t board; + uint16_t channel; + uint64_t timestamp; + uint32_t energy; + uint32_t energyShort; + uint32_t flags; + }; + + namespace Utils + { + void GetDataEventFromBuffer(char* bufferIter, DYListData& dataEvent) + { + dataEvent.board = *((uint16_t*)bufferIter); + bufferIter += 2; + + dataEvent.channel = *((uint16_t*)bufferIter); + bufferIter += 2; + + dataEvent.timestamp = *((uint64_t*)bufferIter); + bufferIter += 8; + + dataEvent.energy = *((uint32_t*)bufferIter); + bufferIter += 4; + + dataEvent.energyShort = *((uint32_t*)bufferIter); + bufferIter += 4; + + dataEvent.flags = *((uint32_t*)bufferIter); + bufferIter += 4; + } + } + +} + +#endif \ No newline at end of file diff --git a/include/DaqGrimoire.h b/include/DaqGrimoire.h new file mode 100644 index 0000000..c883242 --- /dev/null +++ b/include/DaqGrimoire.h @@ -0,0 +1,10 @@ +#ifndef DAQ_GRIMOIRE_H +#define DAQ_GRIMOIRE_H + +#include "FileIO/DYFileReader.h" + +#ifdef DG_HAS_ASIO +#include "NetIO/DYClient.h" +#endif + +#endif \ No newline at end of file diff --git a/include/FileIO/DYFileReader.h b/include/FileIO/DYFileReader.h new file mode 100644 index 0000000..5b0a184 --- /dev/null +++ b/include/FileIO/DYFileReader.h @@ -0,0 +1,123 @@ +#ifndef DY_FILE_READER_H +#define DY_FILE_READER_H + +#include "../DYListData.h" + +#include +#include +#include +#include + +namespace DaqGrimoire { + + class DYFileReader + { + public: + DYFileReader() : + m_fileHandle(nullptr), m_bufferSizeEvents(200000), m_isEOF(false), m_fileSizeBytes(0), m_fileSizeEvents(0), m_bufferIter(nullptr), m_bufferEnd(nullptr) + { + } + + DYFileReader(std::size_t bufferSize) : + m_fileHandle(nullptr), m_bufferSizeEvents(bufferSize), m_isEOF(false), m_fileSizeBytes(0), m_fileSizeEvents(0), m_bufferIter(nullptr), m_bufferEnd(nullptr) + { + } + + DYFileReader(const std::filesystem::path& filepath, std::size_t bufferSize = 200000) : + m_fileHandle(nullptr), m_bufferSizeEvents(bufferSize), m_isEOF(false), m_fileSizeBytes(0), m_fileSizeEvents(0), m_bufferIter(nullptr), m_bufferEnd(nullptr) + { + Open(filepath); + } + + ~DYFileReader() + { + Close(); + } + + void Open(const std::filesystem::path& filepath) + { + m_filepath = filepath; + m_fileHandle->open(filepath, std::ios::binary | std::ios::in); + + m_fileHandle->seekg(0, std::ios_base::end); + m_fileSizeBytes = m_fileHandle->tellg(); + + //Replace this + if (m_fileSizeBytes == 0 || m_fileSizeBytes < Data::Size) + { + m_isEOF = true; + return; + } + else if (m_fileSizeBytes % Data::Size != 0) + { + m_isEOF = true; + return; + } + + m_fileSizeEvents = m_fileSizeBytes / Data::Size; + m_rawBuffer.resize(m_bufferSizeEvents * Data::Size); + } + + void Close() + { + if (IsOpen()) + m_fileHandle->close(); + } + + bool GetNextEvent(DYListData& dataEvent) + { + if (!IsOpen() || IsEOF()) + return false; + + if (m_bufferIter == m_bufferEnd) + { + FillBuffer(); + if (IsEOF()) + return false; + } + + Utils::GetDataEventFromBuffer(m_bufferIter, dataEvent); + + return true; + } + + + const bool IsOpen() const { return m_fileHandle == nullptr ? false : m_fileHandle->is_open(); } + const bool IsEOF() const { return m_isEOF; } + const std::size_t GetFileSizeBytes() const { return m_fileSizeBytes; } + const std::size_t GetFileSizeEvents() const { return m_fileSizeEvents; } + + private: + void FillBuffer() + { + if (m_fileHandle->eof()) + { + m_isEOF = true; + return; + } + + m_fileHandle->read(m_rawBuffer.data(), m_rawBuffer.size()); + + m_bufferIter = m_rawBuffer.data(); + m_bufferEnd = m_bufferIter + m_fileHandle->gcount(); //one past the last datum + } + + + + std::filesystem::path m_filepath; + std::shared_ptr m_fileHandle; + + std::vector m_rawBuffer; + std::size_t m_bufferSizeEvents; //in units of data events + + std::size_t m_fileSizeBytes; //in bytes + std::size_t m_fileSizeEvents; //in data events + + bool m_isEOF; + + char* m_bufferIter; + char* m_bufferEnd; + }; +} + +#endif \ No newline at end of file diff --git a/include/NetIO/DYClient.h b/include/NetIO/DYClient.h new file mode 100644 index 0000000..2dfe297 --- /dev/null +++ b/include/NetIO/DYClient.h @@ -0,0 +1,129 @@ +#ifndef DY_CLIENT_H +#define DY_CLIENT_H + +#include "DYMessage.h" +#include "ThreadSafeQueue.h" + +#include "asio.hpp" + +namespace DaqGrimoire { + + class DYClient + { + public: + DYClient(const std::string& address, const std::string& port) : + m_socket(m_context) + { + Connect(address, port); + } + + ~DYClient() + { + Disconnect(); + } + + bool GetNextEvent(DYListData& dataEvent) + { + if (m_dataQueue.IsEmpty()) + return false; + + dataEvent = m_dataQueue.Front(); + m_dataQueue.PopFront(); + return true; + } + + void Connect(const std::string& address, const std::string& port) + { + try + { + asio::ip::tcp::resolver resolver(m_context); + auto end_points = resolver.resolve(address, port); + asio::async_connect(m_socket, end_points, + [this](std::error_code ec, asio::ip::tcp::endpoint endpoint) + { + if (!ec) + { + ReadHeader(); + } + } + ); + + m_asioThread = std::thread([this]() { m_context.run() }); + } + catch (asio::system_error& e) + { + continue; + } + } + + void Disconnect() + { + if (IsConnected()) + { + asio::post(m_context, [this]() { m_socket.close(); }) + } + + m_context.stop(); + if (m_asioThread.joinable()) + m_asioThread.join(); + } + + bool IsConnected() { return m_socket.is_open(); } + + private: + + + void ReadHeader() + { + asio::async_read(m_socket, asio::buffer(&m_tempMessage.size, sizeof(m_tempMessage.size)), + [this](std::error_code ec, std::size_t size) + { + if (!ec) + { + if (m_tempMessage.size > 0) + { + m_tempMessage.body.resize(m_tempMessage.size); + ReadBody(); + } + else + ReadHeader(); + } + } + ); + } + + void ReadBody() + { + asio::async_read(m_socket, asio::buffer(m_tempMessage.body.data(), m_tempMessage.body.size()), + [this](std::error_code ec, std::size_t size) + { + if (!ec) + { + char* dataPtr = m_tempMessage.body.data(); + char* endPtr = m_tempMessage.body.data() + m_tempMessage.body.size(); + DYListData tempData; + while (dataPtr != endPtr) + { + Utils::GetDataEventFromBuffer(dataPtr, tempData); + m_dataQueue.PushBack(tempData); + } + + ReadHeader(); + } + } + ); + } + + asio::io_context m_context; + asio::ip::tcp::socket m_socket; + + std::thread m_asioThread; + + DYMessage m_tempMessage; + + ThreadSafeQueue m_dataQueue; + }; +} + + +#endif \ No newline at end of file diff --git a/include/NetIO/DYMessage.h b/include/NetIO/DYMessage.h new file mode 100644 index 0000000..4e0ce3e --- /dev/null +++ b/include/NetIO/DYMessage.h @@ -0,0 +1,19 @@ +#ifndef DY_MESSAGE_H +#define DY_MESSAGE_H + +#include "../DYListData.h" + +#include + +namespace DaqGrimoire { + + struct DYMessage + { + DYMessage() = default; + + uint64_t size; + std::vector body; + }; +} + +#endif \ No newline at end of file diff --git a/include/NetIO/ThreadSafeQueue.h b/include/NetIO/ThreadSafeQueue.h new file mode 100644 index 0000000..139dc0f --- /dev/null +++ b/include/NetIO/ThreadSafeQueue.h @@ -0,0 +1,128 @@ +#ifndef THREAD_SAFE_QUEUE_H +#define THREAD_SAFE_QUEUE_H + + +#include +#include +#include +#include +#include + +namespace DaqGrimoire { + + template + class ThreadSafeQueue + { + public: + ThreadSafeQueue() = default; + ThreadSafeQueue(const ThreadSafeQueue&) = delete; //no copy + ~ThreadSafeQueue() { Clear(); } + + void PushBack(const T& data) + { + std::scoped_lock guard(m_queueMutex); + m_queue.push_back(data); + + std::scoped_lock condGuard(m_conditionMutex); + m_conditional.notify_one(); + } + + void PushFront(const T& data) + { + std::scoped_lock guard(m_queueMutex); + m_queue.push_front(data); + + std::scoped_lock conditionGuard(m_conditionMutex); + m_conditional.notify_one(); + } + + void PopBack() + { + std::scoped_lock guard(m_queueMutex); + m_queue.pop_back(); + } + + void PopFront() + { + std::scoped_lock guard(m_queueMutex); + m_queue.pop_front(); + } + + const T& Front() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.front(); + } + + const T& Back() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.back(); + } + + std::size_t Size() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.size(); + } + + bool IsEmpty() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.empty(); + } + + void Clear() + { + std::scoped_lock guard(m_queueMutex); + m_queue.clear(); + } + + //For iterator loops, need begin()/end() idiom + + std::deque::iterator begin() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.begin(); + } + + std::deque::iterator end() + { + std::scoped_lock guard(m_queueMutex); + return m_queue.end(); + } + + void Wait() + { + while (IsEmpty() && !m_isForceWakeup) + { + std::unique_lock guard(m_conditionMutex); + m_conditional.wait(guard); + } + } + + void ForceWakeup() + { + m_isForceWakeup = true; + + std::unique_lock guard(m_conditionMutex); + m_conditional.notify_one(); + } + + void ResetWakeup() + { + m_isForceWakeup = false; + } + + private: + std::mutex m_queueMutex; + std::deque m_queue; + + std::mutex m_conditionMutex; + std::condition_variable m_conditional; + + std::atomic m_isForceWakeup = false; + }; +} + +#endif \ No newline at end of file