namespace sprawl { namespace threading { class ThreadManager; } } #include <stdint.h> #include <functional> #include "../time/time.hpp" #include "../collections/Vector.hpp" #include "../collections/ConcurrentQueue.hpp" #include "../collections/BinaryTree.hpp" #include "../collections/HashMap.hpp" #include "thread.hpp" #include "mutex.hpp" #include "condition_variable.hpp" #include "event.hpp" #include <atomic> class sprawl::threading::ThreadManager { public: typedef std::function<void()> Task; ThreadManager(); ~ThreadManager(); void AddThread(uint64_t threadFlags, char const* const threadName); void AddThread(uint64_t threadFlags); void AddThreads(uint64_t threadFlags, int count, char const* const threadName); void AddThreads(uint64_t threadFlags, int count); void AddTask(Task&& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTask(Task const& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTaskStaged(uint64_t stage, Task&& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTaskStaged(uint64_t stage, Task const& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void SetMaxStage(uint64_t maxStage); void AddFutureTask(Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTask(Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTaskStaged(uint64_t stage, Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTaskStaged(uint64_t stage, Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow); /** * @brief Start all threads and include the calling thread in a loop controlled by the thread manager * @param thisThreadFlags The flags that apply to the calling thread */ void Run(uint64_t thisThreadFlags); void RunStaged(uint64_t thisThreadFlags); /** * @brief Start all threads but do not block on the calling thread. * @details If thisThreadFlags is not 0, the calling thread will be added to the thread pool. * It will then be up to the calling thread to call Pump() to execute the tasks * that get queued up for it. * @param thisThreadFlags The flags that apply to the calling thread */ void Start(uint64_t thisThreadFlags); uint64_t RunNextStage(); void Pump(); void Wait(); uint64_t Sync(); void Stop(); void ShutDown(); private: struct TaskInfo { TaskInfo(); TaskInfo(Task&& what_, uint64_t where_, int64_t when_); TaskInfo(Task const& what_, uint64_t where_, int64_t when_); TaskInfo(Task&& what_, uint64_t where_, int64_t when_, int64_t stage); TaskInfo(Task const& what_, uint64_t where_, int64_t when_, int64_t stage); TaskInfo(TaskInfo&& other); TaskInfo& operator=(TaskInfo&& other); Task what; uint64_t where; int64_t when; std::atomic<bool> taken; uint64_t stage; inline int64_t When() { return when; } }; struct ThreadData { explicit ThreadData(uint64_t flags_) : flags(flags_) , mailbox() { } uint64_t flags; Event mailbox; }; struct ThreadInfo { ThreadInfo(ThreadData* data_, std::function<void()> fn) : data(data_) , thread(new Thread(fn)) { } ThreadInfo(ThreadData* data_, char const* const threadName, std::function<void()> fn) : data(data_) , thread(new Thread(threadName, fn)) { } ThreadInfo(ThreadData* data_) : data(data_) , thread(nullptr) { } ThreadInfo(ThreadInfo&& other) : data(other.data) , thread(other.thread) { other.data = nullptr; other.thread = nullptr; } ~ThreadInfo() { if(data) { delete data; } if(thread) { delete thread; } } ThreadData* data; Thread* thread; }; struct FlagGroup { collections::Vector<Event*> events; collections::ConcurrentQueue<TaskInfo*> taskQueue; }; void pump_(); void pushTask_(TaskInfo* info); void eventLoop_(ThreadData* threadData); void mailMan_(); collections::ConcurrentQueue<TaskInfo*> m_taskQueue; collections::BasicHashMap<int64_t, FlagGroup*> m_flagGroups; Event* m_mainThreadMailbox; collections::ConcurrentQueue<TaskInfo*>* m_mainThreadQueue; collections::Vector<ThreadInfo> m_threads; Thread m_mailmanThread; std::atomic<bool> m_running; uint64_t m_currentStage; uint64_t m_maxStage; enum class SyncState { None, Mailman, Threads, }; std::atomic<SyncState> m_syncState; Event m_workerSyncEvent; Event m_mailmanSyncEvent; std::atomic<size_t> m_syncCount; Event m_mailReady; };
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#2 | 18645 | brandon_m_bare | Integrated latest version of libsprawl. | ||
#1 | 15089 | brandon_m_bare | First integration of sprawl. | ||
//guest/ShadauxCat/Sprawl/Mainline/threading/threadmanager.hpp | |||||
#3 | 14833 | ShadauxCat |
First checkin of logging module. Also fixes the following issues: -Added UpperBound() and LowerBound() to BinaryTree and created appropriate unit tests -Added Sync() to ThreadManager to force it to run all tasks to completion and not return until it has no tasks left -Fixed a bug in String::format() where a non-numeric value inside {} would be treated as an empty {}; it now simply prints whatever the value was. (i.e., "{blah}".format(foo) simply returns "{blah}") -Added Reset() to sprawl::StringBuilder -Disabled the switch-enum warning flag in gcc because it's stupid and ridiculous that a default case doesn't shut it up -Made sprawl::Mutex movable. This may turn out to be a bad idea but it enabled keeping them in a map. -Fixed a name collission between HashMap and BinaryTree; both defined sprawl::collections::detail::UnderlyingType and ::MethodType. Prefixed the ones in BinaryTree with "Tree". This isn't the best solution, but it works for now. #review-14834 |
||
#2 | 14161 | ShadauxCat |
-Added staged option for ThreadManager and corresponding unit test -Added operator[] and getOrInsert() in HashMap. getOrInsert() doesn't follow standard but it's consistent with the rest of the HashMap interface; I'll change them when I go back and redo that interface to fit the style. #review-14162 |
||
#1 | 12508 | ShadauxCat |
-Added threading library. Currently only functional for Linux; Windows will fail to link. (I will fix this soon.) -Fixed missing move and copy constructors in List and ForwardList -Fixed broken move constructor in HashMap -Fixed missing const get() in HashMap -Fixed broken operator-> in ListIterator -Added sprawl::noncopyable -Added sketch headers for filesystem library -Made StringLiteral hashable, added special hashes for pointers and integers in murmur3 -Fixed compiler warning in async_network -Updated memory allocators to use new threading library for mutexes -Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons #review-12504 |