#include "threadmanager.hpp" inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo() : what(nullptr) , taken(false) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(sprawl::threading::ThreadManager::Task&& what_, uint64_t where_, int64_t when_) : what(std::move(what_)) , where(where_) , when(when_) , taken(false) , stage(0) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(sprawl::threading::ThreadManager::Task const& what_, uint64_t where_, int64_t when_) : what(what_) , where(where_) , when(when_) , taken(false) , stage(0) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(sprawl::threading::ThreadManager::Task&& what_, uint64_t where_, int64_t when_, int64_t stage_) : what(std::move(what_)) , where(where_) , when(when_) , taken(false) , stage(stage_) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(sprawl::threading::ThreadManager::Task const& what_, uint64_t where_, int64_t when_, int64_t stage_) : what(what_) , where(where_) , when(when_) , taken(false) , stage(stage_) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(TaskInfo&& other) : what(std::move(other.what)) , where(other.where) , when(other.when) , taken(false) , stage(0) { } sprawl::threading::ThreadManager::TaskInfo& sprawl::threading::ThreadManager::TaskInfo::operator=(TaskInfo&& other) { what = std::move(other.what); where = other.where; when = other.when; stage = other.stage; return *this; } sprawl::threading::ThreadManager::ThreadManager() : m_taskQueue() , m_flagGroups() , m_mainThreadMailbox(nullptr) , m_mainThreadQueue(nullptr) , m_threads() , m_mailmanThread(std::bind(&ThreadManager::mailMan_, this)) , m_running(false) , m_currentStage(0) , m_maxStage(0) , m_syncState(SyncState::None) , m_workerSyncEvent() , m_mailmanSyncEvent() , m_syncCount(0) , m_mailReady() { // } sprawl::threading::ThreadManager::~ThreadManager() { TaskInfo* task; while(m_taskQueue.Dequeue(task)) { delete task; } for(auto& group : m_flagGroups) { delete group.Value(); } for(auto& threadInfo : m_threads) { delete threadInfo.data; delete threadInfo.thread; } } void sprawl::threading::ThreadManager::AddThread(uint64_t threadFlags, const char* const threadName) { ThreadData* data = new ThreadData(threadFlags); m_threads.PushBack(ThreadInfo(data, threadName, std::bind(&ThreadManager::eventLoop_, this, data))); FlagGroup*& group = m_flagGroups[threadFlags]; if(group == nullptr) { group = new FlagGroup(); } group->events.PushBack(&data->mailbox); } void sprawl::threading::ThreadManager::AddThread(uint64_t threadFlags) { ThreadData* data = new ThreadData(threadFlags); m_threads.PushBack(ThreadInfo(data, std::bind(&ThreadManager::eventLoop_, this, data))); FlagGroup*& group = m_flagGroups[threadFlags]; if(group == nullptr) { group = new FlagGroup(); } group->events.PushBack(&data->mailbox); } void sprawl::threading::ThreadManager::AddThreads(uint64_t threadFlags, int count, const char* const threadName) { for(int i = 0; i < count; ++i) { AddThread(threadFlags, threadName); } } void sprawl::threading::ThreadManager::AddThreads(uint64_t threadFlags, int count) { for(int i = 0; i < count; ++i) { AddThread(threadFlags); } } void sprawl::threading::ThreadManager::AddTask(sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(new TaskInfo(std::move(task), threadFlags, whenNanosecs)); } void sprawl::threading::ThreadManager::AddTask(sprawl::threading::ThreadManager::Task const& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(new TaskInfo(task, threadFlags, whenNanosecs)); } void sprawl::threading::ThreadManager::AddTaskStaged(uint64_t stage, sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(new TaskInfo(std::move(task), threadFlags, whenNanosecs, stage)); } void sprawl::threading::ThreadManager::AddTaskStaged(uint64_t stage, sprawl::threading::ThreadManager::Task const& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(new TaskInfo(task, threadFlags, whenNanosecs, stage)); } void sprawl::threading::ThreadManager::SetMaxStage(uint64_t maxStage) { m_maxStage = maxStage; } void sprawl::threading::ThreadManager::AddFutureTask(sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTask(std::move(task), threadFlags, nanosecondsFromNow + time::Now()); } void sprawl::threading::ThreadManager::AddFutureTask(sprawl::threading::ThreadManager::Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTask(task, threadFlags, nanosecondsFromNow + time::Now()); } void sprawl::threading::ThreadManager::AddFutureTaskStaged(uint64_t stage, sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTaskStaged(stage, std::move(task), threadFlags, nanosecondsFromNow + time::Now()); } void sprawl::threading::ThreadManager::AddFutureTaskStaged(uint64_t stage, sprawl::threading::ThreadManager::Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTaskStaged(stage, task, threadFlags, nanosecondsFromNow + time::Now()); } void sprawl::threading::ThreadManager::RunStaged(uint64_t thisThreadFlags) { m_running = true; m_currentStage = 1; m_syncState = SyncState::Threads; for(auto& threadData : m_threads) { threadData.thread->Start(); } AddThread(thisThreadFlags, "Main Thread"); m_mainThreadMailbox = &m_threads.Back().data->mailbox; m_mainThreadQueue = &m_flagGroups.Get(m_threads.Back().data->flags)->taskQueue; size_t threadCount = m_threads.Size() - 1; while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } m_mailmanThread.Start(); while(m_running) { RunNextStage(); } } void sprawl::threading::ThreadManager::Run(uint64_t thisThreadFlags) { m_running = true; for(auto& threadData : m_threads) { threadData.thread->Start(); } AddThread(thisThreadFlags, "Main Thread"); m_mainThreadMailbox = &m_threads.Back().data->mailbox; m_mainThreadQueue = &m_flagGroups.Get(m_threads.Back().data->flags)->taskQueue; m_mailmanThread.Start(); eventLoop_(m_threads.Back().data); } void sprawl::threading::ThreadManager::Start(uint64_t thisThreadFlags) { m_running = true; if(m_maxStage != 0) { m_syncState = SyncState::Threads; m_currentStage = 1; } for(auto& threadInfo : m_threads) { threadInfo.thread->Start(); } AddThread(thisThreadFlags, "Main Thread"); m_mainThreadMailbox = &m_threads.Back().data->mailbox; m_mainThreadQueue = &m_flagGroups.Get(m_threads.Back().data->flags)->taskQueue; if(m_maxStage != 0) { size_t threadCount = m_threads.Size() - 1; while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } } m_mailmanThread.Start(); } void sprawl::threading::ThreadManager::Pump() { TaskInfo* task; while(m_mainThreadQueue->Dequeue(task)) { bool expected = false; if(task->taken.compare_exchange_strong(expected, true)) { task->what(); delete task; } } } void sprawl::threading::ThreadManager::pump_() { TaskInfo* task; while(m_mainThreadQueue->Dequeue(task)) { bool expected = false; if(task->taken.compare_exchange_strong(expected, true)) { task->what(); delete task; } } } uint64_t sprawl::threading::ThreadManager::RunNextStage() { //Don't wait on the main thread... size_t threadCount = m_threads.Size() - 1; m_syncCount = 0; m_syncState = SyncState::None; for(auto& threadInfo : m_threads) { threadInfo.data->mailbox.Notify(); } while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } //Ensure any existing events on the mailmanSyncEvent are consumed m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); //Let the mailman deliver mail... new mail for this stage will come in the next frame. m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); //Stop mail delivery and run threads with only the mail that's already been delivered. m_syncState = SyncState::Mailman; m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); pump_(); m_syncCount = 0; m_syncState = SyncState::Threads; for(auto& threadInfo : m_threads) { threadInfo.data->mailbox.Notify(); } while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } uint64_t stageJustRun = m_currentStage; m_currentStage *= 2; if(m_currentStage > m_maxStage) { m_currentStage = 1; } return stageJustRun; } //Sync is basically the opposite of RunNextStage... instead of doing start stage, run, finish stage, it does finish stage, start stage, and expects the user to call Pump() uint64_t sprawl::threading::ThreadManager::Sync() { //Don't wait on the main thread... size_t threadCount = m_threads.Size() - 1; m_syncCount = 0; m_syncState = SyncState::Threads; for(auto& threadInfo : m_threads) { threadInfo.data->mailbox.Notify(); } while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } uint64_t stageJustRun = m_currentStage; m_currentStage *= 2; if(m_currentStage > m_maxStage) { m_currentStage = 1; } m_syncCount = 0; m_syncState = SyncState::None; for(auto& threadInfo : m_threads) { threadInfo.data->mailbox.Notify(); } while(m_syncCount != threadCount) { m_workerSyncEvent.Wait(); } //Ensure any existing events on the mailmanSyncEvent are consumed m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); //Let the mailman deliver mail... new mail for this stage will come in the next frame. m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); //Stop mail delivery and run threads with only the mail that's already been delivered. m_syncState = SyncState::Mailman; m_mailReady.Notify(); m_mailmanSyncEvent.Wait(); return stageJustRun; } void sprawl::threading::ThreadManager::Wait() { m_mainThreadMailbox->Wait(); } void sprawl::threading::ThreadManager::Stop() { m_syncState = SyncState::None; m_running = false; m_mailReady.Notify(); for(auto& threadInfo : m_threads) { threadInfo.data->mailbox.Notify(); } } void sprawl::threading::ThreadManager::ShutDown() { if(m_running) { Stop(); } for(auto& threadInfo : m_threads) { if(threadInfo.thread->Joinable() && threadInfo.thread->GetHandle() != sprawl::this_thread::GetHandle()) { threadInfo.thread->Join(); } } m_mailmanThread.Join(); m_threads.Clear(); } void sprawl::threading::ThreadManager::pushTask_(TaskInfo* task) { m_taskQueue.Enqueue(task); m_mailReady.Notify(); } void sprawl::threading::ThreadManager::eventLoop_(ThreadData* threadData) { collections::ConcurrentQueue<TaskInfo*>& queue = m_flagGroups.Get(threadData->flags)->taskQueue; Event& mailbox = threadData->mailbox; while(m_running) { TaskInfo* task; while(queue.Dequeue(task)) { bool expected = false; if(task->taken.compare_exchange_strong(expected, true)) { task->what(); delete task; } } if(m_syncState == SyncState::Threads) { ++m_syncCount; m_workerSyncEvent.Notify(); while(m_syncState == SyncState::Threads) { mailbox.Wait(); } ++m_syncCount; m_workerSyncEvent.Notify(); } else { mailbox.Wait(); } } } #include <map> void sprawl::threading::ThreadManager::mailMan_() { std::map<int64_t, TaskInfo*> prioritizedTasks; while(m_running) { TaskInfo* task; while(m_taskQueue.Dequeue(task)) { while(prioritizedTasks.find(task->when) != prioritizedTasks.end()) { ++task->when; } prioritizedTasks.emplace(task->when, task); } collections::HashSet<int64_t> keysToDelete; int64_t lastTime = 0; for(auto& flagGroup : m_flagGroups) { bool addedTask = false; auto it = prioritizedTasks.begin(); for(;it != prioritizedTasks.end() && it->first <= time::Now();++it) { if(it->second->where != 0 && (it->second->where & flagGroup.Key()) == 0) { continue; } if(m_currentStage != 0 && it->second->stage != 0 && (it->second->stage & m_currentStage) == 0) { continue; } flagGroup.Value()->taskQueue.Enqueue(it->second); addedTask = true; keysToDelete.Insert(it->first); } if(addedTask) { for(auto& event : flagGroup.Value()->events) { event->Notify(); } } if(it != prioritizedTasks.end()) { lastTime = it->first; } } for(auto& key : keysToDelete) { prioritizedTasks.erase(key.Key()); } auto syncStatePreNotify = m_syncState.load(); m_mailmanSyncEvent.Notify(); if (syncStatePreNotify != SyncState::None) { while(m_syncState != SyncState::None) { m_mailReady.Wait(); } } else { if(lastTime != 0) { m_mailReady.WaitUntil(lastTime); } else { m_mailReady.Wait(); } } } for(auto& task : prioritizedTasks) { delete task.second; } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 23398 | ququlala | "Forking branch Mainline of shadauxcat-libsprawl to ququlala-libsprawl." | ||
//guest/ShadauxCat/Sprawl/Mainline/threading/threadmanager.cpp | |||||
#7 | 16070 | ShadauxCat |
Fixed possible hang during ThreadManager::RunStaged() and ThreadManager::Sync() in a pre-emptive environment if the mailman thread happens to be pre-empted right after calling m_mailSyncEvent.Notify(). Probably a better fix for this will come in the future. I'm overall not fully sold on the current implementation of Sync(). #review-16071 |
||
#6 | 16052 | ShadauxCat |
- Changed default block size for concurrent queue to a more reasonable value - Changed some memory orders to memory_order_seq_cst when they don't actually need to be that to get around a bug in visual studio 2013 - debug builds assert when memory_order_acq_rel is used for a compare_exchange_strong (this is a standard library bug and is fixed in VS2015) - Added Event API - events are an alternative to condition variables that do not require a mutex and are guaranteed not to miss any signals, even if the signal comes while the thread is not listening for it. Unlike condition variables, however, they do not support broadcasting (and in fact, in general, are not safe to use with multiple threads listening for the same event simultaneously - though notifying on the same event is fine) - Rewrote ThreadManager around ConcurrentQueue and Event API so it is now lock-free. Also improved some behaviors of the staged thread manager operation so it now supports tasks that can be run on multiple stages via a bitmask. - Fixed an issue where the Coroutine copy constructor was calling the std::function constructor instead and another where initializing with a stack might try to call the wrong constructor and vice-versa - Fixed Coroutine never calling munmap() on its stack in linux and causing a memory leak - Added default arguments to time functions - Attempted to fix some issues with BinaryTree. Fixed some but not all. It's currently not suitable for use, sadly. - Logging Improvements: - - Added thread ID to logging - - Fixed some issues with category handlers - - Added backtraces - - Added the following additional log macros: - - - LOG_IF - - - LOG_EVERY_N - - - LOG_FIRST_N - - - LOG_IF_EVERY_N - - - LOG_IF_FIRST_N - - - LOG_ASSERT - - Added the ability to set extra info callbacks to get data such as script backtraces - - Removed the thread-related handlers and replaced them with RunHandler_Threaded and RunHandler_ThreadManager, which will enable any passed-in handler to be run in a threaded fashion - Removed StaticPoolAllocator and renamed DynamicPoolAllocator to PoolAllocator; adjusted unit tests accordingly - PoolAllocator now allocates its pool with mmap and VirtualAlloc, rather than with malloc - Fixed a bug with Vector copy assignment operator - Improved performance of StringBuilder considerably for cases where there are no modifier strings - Removed Copy-On-Write behavior of JSONToken as it was broken; copies are now performed with explicit DeepCopy() and ShallowCopy() functions - Fixed some parser bugs with JSONToken - Added iteration to JSONToken to iterate its children - Fixed crash when reading a negative number of bytes from a file - Changed StringBuilder to favor speed instead of memory by default - Added some performance unit tests for JSON token #review-16053 |
||
#5 | 14833 | ShadauxCat |
First checkin of logging module. Also fixes the following issues: -Added UpperBound() and LowerBound() to BinaryTree and created appropriate unit tests -Added Sync() to ThreadManager to force it to run all tasks to completion and not return until it has no tasks left -Fixed a bug in String::format() where a non-numeric value inside {} would be treated as an empty {}; it now simply prints whatever the value was. (i.e., "{blah}".format(foo) simply returns "{blah}") -Added Reset() to sprawl::StringBuilder -Disabled the switch-enum warning flag in gcc because it's stupid and ridiculous that a default case doesn't shut it up -Made sprawl::Mutex movable. This may turn out to be a bad idea but it enabled keeping them in a map. -Fixed a name collission between HashMap and BinaryTree; both defined sprawl::collections::detail::UnderlyingType and ::MethodType. Prefixed the ones in BinaryTree with "Tree". This isn't the best solution, but it works for now. #review-14834 |
||
#4 | 14783 | ShadauxCat |
Style corrections (placement of const) #review-14784 |
||
#3 | 14163 | ShadauxCat |
-Renamed HashMap functions to follow coding style. Only begin, end, find, and variants are left lowercase, in keeping with C++ algorithm and range-based for support. -Fixed some accounting issues with list and forwardlist; size wasn't properly being maintained. -Made a small pedantic change to ThreadManager to ensure that m_numThreadsSynced got reset to 0 before the NotifyAll() to eliminate the miniscule potential for deadlock it would cause if it happened after another thread had already woken up. #review-14164 |
||
#2 | 14161 | ShadauxCat |
-Added staged option for ThreadManager and corresponding unit test -Added operator[] and getOrInsert() in HashMap. getOrInsert() doesn't follow standard but it's consistent with the rest of the HashMap interface; I'll change them when I go back and redo that interface to fit the style. #review-14162 |
||
#1 | 12508 | ShadauxCat |
-Added threading library. Currently only functional for Linux; Windows will fail to link. (I will fix this soon.) -Fixed missing move and copy constructors in List and ForwardList -Fixed broken move constructor in HashMap -Fixed missing const get() in HashMap -Fixed broken operator-> in ListIterator -Added sprawl::noncopyable -Added sketch headers for filesystem library -Made StringLiteral hashable, added special hashes for pointers and integers in murmur3 -Fixed compiler warning in async_network -Updated memory allocators to use new threading library for mutexes -Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons #review-12504 |