1
0
Fork 0
mirror of https://github.com/gwm17/Mask.git synced 2024-11-22 18:28:51 -05:00

Overhaul for multi-threading. Needs some testing to validate, but first signs are good.

This commit is contained in:
Gordon McCann 2022-09-02 17:14:30 -04:00
parent 76ae86d595
commit 0456f230f5
10 changed files with 348 additions and 20 deletions

View File

@ -1,5 +1,6 @@
----------Data Information---------- ----------Data Information----------
OutputFile: /media/data/gwm17/mask_tests/10B3Hea_16800keV_5Lia_74B.root OutputFile: /media/data/gwm17/mask_tests/10B3Hea_16800keV_5Lia_74B.root
NumberOfThreads: 6
----------Reaction Information---------- ----------Reaction Information----------
NumberOfSamples: 100000 NumberOfSamples: 100000
begin_chain begin_chain

View File

@ -24,6 +24,7 @@ int main(int argc, char** argv)
return 1; return 1;
} }
calculator.Run(); calculator.Run();
//calculator.RunSingleThread();
} }
catch(const std::exception& e) catch(const std::exception& e)
{ {

View File

@ -50,7 +50,13 @@ target_sources(Mask PRIVATE
ThreeStepSystem.h ThreeStepSystem.h
TwoStepSystem.cpp TwoStepSystem.cpp
TwoStepSystem.h TwoStepSystem.h
ThreadPool.h
FileWriter.h
FileWriter.cpp
) )
target_link_libraries(Mask catima MaskDict ${ROOT_LIBRARIES}) set(THREADS_PREFER_PTHREAD_FLAG On)
find_package(Threads REQUIRED)
target_link_libraries(Mask catima MaskDict ${ROOT_LIBRARIES} Threads::Threads)
set_target_properties(Mask PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${MASK_LIBRARY_DIR}) set_target_properties(Mask PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${MASK_LIBRARY_DIR})

75
src/Mask/FileWriter.cpp Normal file
View File

@ -0,0 +1,75 @@
#include "FileWriter.h"
namespace Mask {
FileWriter::FileWriter() :
m_file(nullptr), m_tree(nullptr), m_queueSize(0)
{
}
FileWriter::FileWriter(const std::string& filename, const std::string& treename) :
m_file(nullptr), m_tree(nullptr)
{
m_file = TFile::Open(filename.c_str(), "RECREATE");
if(m_file != nullptr && m_file->IsOpen())
{
m_tree = new TTree(treename.c_str(), treename.c_str());
m_tree->Branch("nuclei", &m_dataHandle);
}
}
FileWriter::~FileWriter()
{
Close();
}
void FileWriter::Open(const std::string& filename, const std::string& treename)
{
if(m_file != nullptr || m_tree != nullptr)
Close();
m_file = TFile::Open(filename.c_str(), "RECREATE");
if(m_file != nullptr && m_file->IsOpen())
{
m_tree = new TTree(treename.c_str(), treename.c_str());
m_tree->Branch("nuclei", &m_dataHandle);
}
}
void FileWriter::Close()
{
if(m_file->IsOpen())
{
if(m_tree != nullptr)
m_tree->Write(m_tree->GetName(), TObject::kOverwrite);
m_file->Close();
delete m_file;
m_file = nullptr;
}
}
void FileWriter::PushData(const std::vector<Nucleus>& data)
{
std::scoped_lock<std::mutex> guard(m_queueMutex);
m_queue.push(data);
++m_queueSize;
}
bool FileWriter::Write()
{
if(m_queueSize == 0)
return false;
//Aquire lock for as short a time as possible
{
std::scoped_lock<std::mutex> guard(m_queueMutex);
m_dataHandle = m_queue.front();
m_queue.pop();
}
--m_queueSize;
m_tree->Fill();
return true;
}
}

45
src/Mask/FileWriter.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef FILE_WRITER_H
#define FILE_WRITER_H
#include "Nucleus.h"
#include "TFile.h"
#include "TTree.h"
#include <queue>
#include <mutex>
#include <atomic>
namespace Mask {
class FileWriter
{
public:
FileWriter();
FileWriter(const std::string& filename, const std::string& treename);
~FileWriter();
bool IsOpen() const { return m_file->IsOpen(); }
bool IsTree() const { return m_tree == nullptr ? false : true; }
std::size_t GetQueueSize() const { return m_queueSize; } //Implicitly thread-safe
void PushData(const std::vector<Nucleus>& data); //Thread-safe
bool Write(); //Not thread safe!
void Open(const std::string& filename, const std::string& treename);
void Close(); //Not thread safe!
private:
TFile* m_file;
TTree* m_tree;
std::vector<Nucleus> m_dataHandle;
std::mutex m_queueMutex;
std::atomic<std::size_t> m_queueSize;
std::queue<std::vector<Nucleus>> m_queue;
};
}
#endif

View File

@ -8,7 +8,7 @@
namespace Mask { namespace Mask {
MaskApp::MaskApp() : MaskApp::MaskApp() :
m_system(nullptr) m_system(nullptr), m_resources(nullptr)
{ {
std::cout<<"----------Monte Carlo Simulation of Kinematics----------"<<std::endl; std::cout<<"----------Monte Carlo Simulation of Kinematics----------"<<std::endl;
} }
@ -16,6 +16,8 @@ namespace Mask {
MaskApp::~MaskApp() MaskApp::~MaskApp()
{ {
delete m_system; delete m_system;
for(std::size_t i=0; i<m_systemList.size(); i++)
delete m_systemList[i];
} }
bool MaskApp::LoadConfig(const std::string& filename) bool MaskApp::LoadConfig(const std::string& filename)
@ -31,10 +33,18 @@ namespace Mask {
std::string junk; std::string junk;
std::getline(input, junk); std::getline(input, junk);
input>>junk>>m_outputName; input>>junk>>m_outputName;
input>>junk>>m_nthreads;
std::getline(input, junk); std::getline(input, junk);
std::getline(input, junk); std::getline(input, junk);
input>>junk>>m_nsamples; input>>junk>>m_nsamples;
std::cout<<"Allocating resources... Asking for " << m_nthreads << " threads...";
m_resources = std::make_unique<ThreadPool>(m_nthreads);
std::cout<<" Complete."<<std::endl;
std::cout<<"Outputing data to file: " << m_outputName <<std::endl;
m_fileWriter.Open(m_outputName, "SimTree");
std::vector<StepParameters> params; std::vector<StepParameters> params;
int z, a; int z, a;
while(input>>junk) while(input>>junk)
@ -98,10 +108,20 @@ namespace Mask {
m_system = CreateSystem(params); m_system = CreateSystem(params);
if(m_system == nullptr || !m_system->IsValid()) if(m_system == nullptr || !m_system->IsValid())
{ {
std::cerr<<"Param size: "<<params.size()<<std::endl;
std::cerr<<"Failure to parse reaction system... configuration not loaded."<<std::endl; std::cerr<<"Failure to parse reaction system... configuration not loaded."<<std::endl;
return false; return false;
} }
for(uint32_t i=0; i<m_nthreads; i++)
{
m_systemList.push_back(CreateSystem(params));
if(m_systemList.back() == nullptr || !m_systemList.back()->IsValid())
{
std::cerr<<"Failure to parse reaction system... configuration not loaded."<<std::endl;
return false;
}
}
std::getline(input, junk); std::getline(input, junk);
std::getline(input, junk); std::getline(input, junk);
@ -132,9 +152,13 @@ namespace Mask {
} }
m_system->SetLayeredTarget(target); m_system->SetLayeredTarget(target);
std::cout<<"Outputing data to file: "<<m_outputName<<std::endl; for(auto system : m_systemList)
std::cout<<"Reaction equation: "<<GetSystemName()<<std::endl; {
std::cout<<"Number of samples: "<<GetNumberOfSamples()<<std::endl; system->SetLayeredTarget(target);
}
std::cout<<"Reaction equation: "<<m_system->GetSystemEquation()<<std::endl;
std::cout<<"Number of samples: "<<m_nsamples<<std::endl;
return true; return true;
} }
@ -146,6 +170,42 @@ namespace Mask {
} }
void MaskApp::Run() void MaskApp::Run()
{
std::cout<<"Running simulation..."<<std::endl;
if(m_systemList.size() != m_nthreads)
{
return;
}
//Give our thread pool some tasks
for(auto system : m_systemList)
m_resources->PushJob({std::bind(&MaskApp::RunChunk, std::ref(*this), std::placeholders::_1), system});
uint64_t count = 0;
double percent = 0.05;
uint64_t flushVal = m_nsamples*percent;
uint64_t flushCount = 0;
while(true)
{
if(count == flushVal)
{
count = 0;
++flushCount;
std::cout<<"\rPercent of data written to disk: "<<percent*flushCount*100<<"%"<<std::flush;
}
if(m_resources->IsFinished() && m_fileWriter.GetQueueSize() == 0)
break;
else if(m_fileWriter.Write())
++count;
}
std::cout<<std::endl;
std::cout<<"Complete."<<std::endl;
std::cout<<"---------------------------------------------"<<std::endl;
}
void MaskApp::RunSingleThread()
{ {
std::cout<<"Running simulation..."<<std::endl; std::cout<<"Running simulation..."<<std::endl;
if(m_system == nullptr) if(m_system == nullptr)
@ -158,11 +218,11 @@ namespace Mask {
tree->Branch("nuclei", m_system->GetNuclei()); tree->Branch("nuclei", m_system->GetNuclei());
//For progress tracking //For progress tracking
uint32_t percent5 = 0.05*m_nsamples; uint64_t percent5 = 0.05*m_nsamples;
uint32_t count = 0; uint64_t count = 0;
uint32_t npercent = 0; uint64_t npercent = 0;
for(uint32_t i=0; i<m_nsamples; i++) for(uint64_t i=0; i<m_nsamples; i++)
{ {
if(++count == percent5) if(++count == percent5)
{ {
@ -183,4 +243,18 @@ namespace Mask {
std::cout<<"---------------------------------------------"<<std::endl; std::cout<<"---------------------------------------------"<<std::endl;
} }
void MaskApp::RunChunk(ReactionSystem* system)
{
if(system == nullptr)
return;
uint64_t samples = m_nsamples / m_nthreads;
for(uint64_t i=0; i<samples; i++)
{
system->RunSystem();
m_fileWriter.PushData(*(system->GetNuclei()));
}
}
} }

View File

@ -7,6 +7,10 @@
#include "TwoStepSystem.h" #include "TwoStepSystem.h"
#include "ThreeStepSystem.h" #include "ThreeStepSystem.h"
#include "RxnType.h" #include "RxnType.h"
#include "ThreadPool.h"
#include "FileWriter.h"
#include <memory>
namespace Mask { namespace Mask {
@ -17,18 +21,23 @@ namespace Mask {
~MaskApp(); ~MaskApp();
bool LoadConfig(const std::string& filename); bool LoadConfig(const std::string& filename);
bool SaveConfig(const std::string& filename); bool SaveConfig(const std::string& filename);
int GetNumberOfSamples() const { return m_nsamples; }
const std::string GetSystemName() const { return m_system == nullptr ? "Invalid System" : m_system->GetSystemEquation(); }
const std::string GetOutputName() const { return m_outputName; }
const RxnType GetReactionType() const { return m_rxnType; }
void Run(); void Run();
void RunSingleThread();
private: private:
void RunChunk(ReactionSystem* system);
ReactionSystem* m_system; ReactionSystem* m_system;
std::string m_outputName; std::string m_outputName;
RxnType m_rxnType; RxnType m_rxnType;
int m_nsamples; uint64_t m_nsamples;
uint32_t m_nthreads;
std::vector<ReactionSystem*> m_systemList; //One system for each thread
FileWriter m_fileWriter;
std::unique_ptr<ThreadPool> m_resources;
}; };

View File

@ -1,7 +1,6 @@
#include "RandomGenerator.h" #include "RandomGenerator.h"
namespace Mask { namespace Mask {
RandomGenerator* RandomGenerator::s_instance = new RandomGenerator();
RandomGenerator::RandomGenerator() RandomGenerator::RandomGenerator()
{ {

View File

@ -10,12 +10,15 @@ namespace Mask {
public: public:
~RandomGenerator(); ~RandomGenerator();
std::mt19937& GetGenerator() { return rng; } std::mt19937& GetGenerator() { return rng; }
static RandomGenerator& GetInstance() { return *s_instance; } static RandomGenerator& GetInstance()
{
static thread_local RandomGenerator s_instance;
return s_instance;
}
private: private:
RandomGenerator(); RandomGenerator();
static RandomGenerator* s_instance;
std::mt19937 rng; std::mt19937 rng;
}; };

115
src/Mask/ThreadPool.h Normal file
View File

@ -0,0 +1,115 @@
/*
ThreadPool.h
Simple thread pool implementation. Jobs are q'd, and as threads are made available, jobs are dispatched to these threads.
GWM -- Aug 2022
*/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include "ReactionSystem.h"
#include <vector>
#include <atomic>
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>
namespace Mask {
using JobFunction = std::function<void(ReactionSystem*)>;
struct Job
{
JobFunction func;
ReactionSystem* argument;
};
class ThreadPool
{
public:
ThreadPool(uint32_t nthreads) :
m_isStopped(false), m_numberRunning(0), m_queueSize(0), m_initShutdown(false)
{
for(uint32_t i=0; i<nthreads; i++)
{
m_pool.push_back(std::thread(std::bind(&ThreadPool::ExecuteJob, std::ref(*this))));
}
}
~ThreadPool()
{
if(!m_isStopped)
Shutdown();
}
void PushJob(Job job)
{
{
std::unique_lock<std::mutex> guard(m_poolMutex);
m_queue.push(job);
m_queueSize++;
}
m_wakeCondition.notify_one();
}
bool IsFinished()
{
return m_numberRunning == 0 && m_queueSize == 0;
}
void Shutdown()
{
m_initShutdown = true;
m_wakeCondition.notify_all();
for(auto& thread : m_pool)
{
thread.join();
}
m_pool.clear();
m_isStopped = true;
}
private:
void ExecuteJob()
{
while(true)
{
Job job;
{
std::unique_lock<std::mutex> guard(m_poolMutex);
m_wakeCondition.wait(guard, [this](){
return (!m_queue.empty() || m_initShutdown);
});
if(m_initShutdown)
return;
job = m_queue.front();
m_queue.pop();
}
//Change number running first so that no crash
m_numberRunning++;
m_queueSize--;
job.func(job.argument);
m_numberRunning--;
}
}
std::queue<Job, std::deque<Job>> m_queue;
std::vector<std::thread> m_pool;
std::mutex m_poolMutex;
std::condition_variable m_wakeCondition;
bool m_isStopped;
std::atomic<int> m_numberRunning;
std::atomic<int> m_queueSize;
std::atomic<bool> m_initShutdown;
};
}
#endif