diff --git a/input.txt b/input.txt index 7922dc8..7579ed3 100644 --- a/input.txt +++ b/input.txt @@ -1,5 +1,6 @@ ----------Data Information---------- OutputFile: /media/data/gwm17/mask_tests/10B3Hea_16800keV_5Lia_74B.root +NumberOfThreads: 6 ----------Reaction Information---------- NumberOfSamples: 100000 begin_chain diff --git a/src/Kinematics/main.cpp b/src/Kinematics/main.cpp index 2cec2fd..03798b2 100644 --- a/src/Kinematics/main.cpp +++ b/src/Kinematics/main.cpp @@ -24,6 +24,7 @@ int main(int argc, char** argv) return 1; } calculator.Run(); + //calculator.RunSingleThread(); } catch(const std::exception& e) { diff --git a/src/Mask/CMakeLists.txt b/src/Mask/CMakeLists.txt index ec14d3f..d08879f 100644 --- a/src/Mask/CMakeLists.txt +++ b/src/Mask/CMakeLists.txt @@ -50,7 +50,13 @@ target_sources(Mask PRIVATE ThreeStepSystem.h TwoStepSystem.cpp TwoStepSystem.h + ThreadPool.h + FileWriter.h + FileWriter.cpp ) -target_link_libraries(Mask catima MaskDict ${ROOT_LIBRARIES}) +set(THREADS_PREFER_PTHREAD_FLAG On) +find_package(Threads REQUIRED) + +target_link_libraries(Mask catima MaskDict ${ROOT_LIBRARIES} Threads::Threads) set_target_properties(Mask PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${MASK_LIBRARY_DIR}) \ No newline at end of file diff --git a/src/Mask/FileWriter.cpp b/src/Mask/FileWriter.cpp new file mode 100644 index 0000000..3ad7da7 --- /dev/null +++ b/src/Mask/FileWriter.cpp @@ -0,0 +1,75 @@ +#include "FileWriter.h" + +namespace Mask { + + FileWriter::FileWriter() : + m_file(nullptr), m_tree(nullptr), m_queueSize(0) + { + } + + FileWriter::FileWriter(const std::string& filename, const std::string& treename) : + m_file(nullptr), m_tree(nullptr) + { + m_file = TFile::Open(filename.c_str(), "RECREATE"); + if(m_file != nullptr && m_file->IsOpen()) + { + m_tree = new TTree(treename.c_str(), treename.c_str()); + m_tree->Branch("nuclei", &m_dataHandle); + } + } + + FileWriter::~FileWriter() + { + Close(); + } + + void FileWriter::Open(const std::string& filename, const std::string& treename) + { + if(m_file != nullptr || m_tree != nullptr) + Close(); + + m_file = TFile::Open(filename.c_str(), "RECREATE"); + if(m_file != nullptr && m_file->IsOpen()) + { + m_tree = new TTree(treename.c_str(), treename.c_str()); + m_tree->Branch("nuclei", &m_dataHandle); + } + } + + void FileWriter::Close() + { + if(m_file->IsOpen()) + { + if(m_tree != nullptr) + m_tree->Write(m_tree->GetName(), TObject::kOverwrite); + + m_file->Close(); + delete m_file; + m_file = nullptr; + } + } + + void FileWriter::PushData(const std::vector& data) + { + std::scoped_lock guard(m_queueMutex); + m_queue.push(data); + ++m_queueSize; + } + + bool FileWriter::Write() + { + if(m_queueSize == 0) + return false; + + //Aquire lock for as short a time as possible + { + std::scoped_lock guard(m_queueMutex); + m_dataHandle = m_queue.front(); + m_queue.pop(); + } + --m_queueSize; + + m_tree->Fill(); + return true; + } +} \ No newline at end of file diff --git a/src/Mask/FileWriter.h b/src/Mask/FileWriter.h new file mode 100644 index 0000000..c424965 --- /dev/null +++ b/src/Mask/FileWriter.h @@ -0,0 +1,45 @@ +#ifndef FILE_WRITER_H +#define FILE_WRITER_H + +#include "Nucleus.h" + +#include "TFile.h" +#include "TTree.h" + +#include +#include +#include + +namespace Mask { + + class FileWriter + { + public: + FileWriter(); + FileWriter(const std::string& filename, const std::string& treename); + ~FileWriter(); + + bool IsOpen() const { return m_file->IsOpen(); } + bool IsTree() const { return m_tree == nullptr ? false : true; } + + std::size_t GetQueueSize() const { return m_queueSize; } //Implicitly thread-safe + + void PushData(const std::vector& data); //Thread-safe + bool Write(); //Not thread safe! + + void Open(const std::string& filename, const std::string& treename); + void Close(); //Not thread safe! + + private: + TFile* m_file; + TTree* m_tree; + + std::vector m_dataHandle; + + std::mutex m_queueMutex; + std::atomic m_queueSize; + std::queue> m_queue; + }; +} + +#endif \ No newline at end of file diff --git a/src/Mask/MaskApp.cpp b/src/Mask/MaskApp.cpp index bc27127..32fdf31 100644 --- a/src/Mask/MaskApp.cpp +++ b/src/Mask/MaskApp.cpp @@ -8,7 +8,7 @@ namespace Mask { MaskApp::MaskApp() : - m_system(nullptr) + m_system(nullptr), m_resources(nullptr) { std::cout<<"----------Monte Carlo Simulation of Kinematics----------"<>junk>>m_outputName; + input>>junk>>m_nthreads; std::getline(input, junk); std::getline(input, junk); input>>junk>>m_nsamples; + std::cout<<"Allocating resources... Asking for " << m_nthreads << " threads..."; + m_resources = std::make_unique(m_nthreads); + std::cout<<" Complete."< params; int z, a; while(input>>junk) @@ -98,10 +108,20 @@ namespace Mask { m_system = CreateSystem(params); if(m_system == nullptr || !m_system->IsValid()) { - std::cerr<<"Param size: "<IsValid()) + { + std::cerr<<"Failure to parse reaction system... configuration not loaded."<SetLayeredTarget(target); - std::cout<<"Outputing data to file: "<SetLayeredTarget(target); + } + + std::cout<<"Reaction equation: "<GetSystemEquation()<PushJob({std::bind(&MaskApp::RunChunk, std::ref(*this), std::placeholders::_1), system}); + + uint64_t count = 0; + double percent = 0.05; + uint64_t flushVal = m_nsamples*percent; + uint64_t flushCount = 0; + while(true) + { + if(count == flushVal) + { + count = 0; + ++flushCount; + std::cout<<"\rPercent of data written to disk: "<IsFinished() && m_fileWriter.GetQueueSize() == 0) + break; + else if(m_fileWriter.Write()) + ++count; + } + + std::cout<Branch("nuclei", m_system->GetNuclei()); //For progress tracking - uint32_t percent5 = 0.05*m_nsamples; - uint32_t count = 0; - uint32_t npercent = 0; + uint64_t percent5 = 0.05*m_nsamples; + uint64_t count = 0; + uint64_t npercent = 0; - for(uint32_t i=0; iRunSystem(); + m_fileWriter.PushData(*(system->GetNuclei())); + } + } + } diff --git a/src/Mask/MaskApp.h b/src/Mask/MaskApp.h index 62cbead..8b86cf5 100644 --- a/src/Mask/MaskApp.h +++ b/src/Mask/MaskApp.h @@ -7,6 +7,10 @@ #include "TwoStepSystem.h" #include "ThreeStepSystem.h" #include "RxnType.h" +#include "ThreadPool.h" +#include "FileWriter.h" + +#include namespace Mask { @@ -17,18 +21,23 @@ namespace Mask { ~MaskApp(); bool LoadConfig(const std::string& filename); bool SaveConfig(const std::string& filename); - int GetNumberOfSamples() const { return m_nsamples; } - const std::string GetSystemName() const { return m_system == nullptr ? "Invalid System" : m_system->GetSystemEquation(); } - const std::string GetOutputName() const { return m_outputName; } - const RxnType GetReactionType() const { return m_rxnType; } void Run(); + void RunSingleThread(); private: + void RunChunk(ReactionSystem* system); + ReactionSystem* m_system; + std::string m_outputName; RxnType m_rxnType; - int m_nsamples; + uint64_t m_nsamples; + uint32_t m_nthreads; + + std::vector m_systemList; //One system for each thread + FileWriter m_fileWriter; + std::unique_ptr m_resources; }; diff --git a/src/Mask/RandomGenerator.cpp b/src/Mask/RandomGenerator.cpp index 1153103..df96cf6 100644 --- a/src/Mask/RandomGenerator.cpp +++ b/src/Mask/RandomGenerator.cpp @@ -1,8 +1,7 @@ #include "RandomGenerator.h" namespace Mask { - RandomGenerator* RandomGenerator::s_instance = new RandomGenerator(); - + RandomGenerator::RandomGenerator() { std::random_device rd; diff --git a/src/Mask/RandomGenerator.h b/src/Mask/RandomGenerator.h index fd8a0b2..f23ac6b 100644 --- a/src/Mask/RandomGenerator.h +++ b/src/Mask/RandomGenerator.h @@ -10,12 +10,15 @@ namespace Mask { public: ~RandomGenerator(); std::mt19937& GetGenerator() { return rng; } - static RandomGenerator& GetInstance() { return *s_instance; } + static RandomGenerator& GetInstance() + { + static thread_local RandomGenerator s_instance; + return s_instance; + } private: RandomGenerator(); - static RandomGenerator* s_instance; std::mt19937 rng; }; diff --git a/src/Mask/ThreadPool.h b/src/Mask/ThreadPool.h new file mode 100644 index 0000000..babfaa4 --- /dev/null +++ b/src/Mask/ThreadPool.h @@ -0,0 +1,115 @@ +/* + ThreadPool.h + Simple thread pool implementation. Jobs are q'd, and as threads are made available, jobs are dispatched to these threads. + + GWM -- Aug 2022 +*/ +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include "ReactionSystem.h" + +#include +#include +#include +#include +#include +#include +#include + + +namespace Mask { + + using JobFunction = std::function; + struct Job + { + JobFunction func; + ReactionSystem* argument; + }; + + class ThreadPool + { + public: + ThreadPool(uint32_t nthreads) : + m_isStopped(false), m_numberRunning(0), m_queueSize(0), m_initShutdown(false) + { + for(uint32_t i=0; i guard(m_poolMutex); + m_queue.push(job); + m_queueSize++; + } + m_wakeCondition.notify_one(); + } + + bool IsFinished() + { + return m_numberRunning == 0 && m_queueSize == 0; + } + + void Shutdown() + { + m_initShutdown = true; + m_wakeCondition.notify_all(); + for(auto& thread : m_pool) + { + thread.join(); + } + + m_pool.clear(); + m_isStopped = true; + } + + private: + void ExecuteJob() + { + while(true) + { + Job job; + { + std::unique_lock guard(m_poolMutex); + m_wakeCondition.wait(guard, [this](){ + return (!m_queue.empty() || m_initShutdown); + }); + if(m_initShutdown) + return; + + job = m_queue.front(); + m_queue.pop(); + } + + //Change number running first so that no crash + m_numberRunning++; + m_queueSize--; + + job.func(job.argument); + m_numberRunning--; + } + + } + + std::queue> m_queue; + std::vector m_pool; + std::mutex m_poolMutex; + std::condition_variable m_wakeCondition; + + bool m_isStopped; + std::atomic m_numberRunning; + std::atomic m_queueSize; + std::atomic m_initShutdown; + }; +} + +#endif \ No newline at end of file