#include "../../collections/ConcurrentQueue.hpp" #include <gtest/gtest.h> #include "../../collections/HashMap.hpp" #include "../../collections/Deque.hpp" #include "../../threading/thread.hpp" #include "../../threading/mutex.hpp" #include "../../time/time.hpp" #include <math.h> #ifndef _WIN32 #include <malloc.h> #endif namespace ConcurrentQueue { const int numInsertsPerThread = 100000; sprawl::collections::ConcurrentQueue<int, numInsertsPerThread * 10> queue; void Enqueue(int startingPoint) { for(int i = startingPoint; i < startingPoint + numInsertsPerThread; ++i) { queue.Enqueue(i); } } sprawl::collections::BasicHashMap<int, int> results; sprawl::threading::Mutex mtx; void Dequeue() { sprawl::collections::BasicHashMap<int, int> localResults; for(;;) { int i; if(!queue.Dequeue(i)) { break; } ++localResults[i]; } sprawl::threading::ScopedLock lock(mtx); for(auto& kvp : localResults) { results[kvp.Key()] += kvp.Value(); } } std::atomic<int> count; void DequeueSimple() { while(count < numInsertsPerThread * 10) { int i; if(queue.Dequeue(i)) { ++count; } } } // TEST(ConcurrentQueue, WorksOnManyThreads) // { // sprawl::threading::Thread t1(Enqueue, 0); // sprawl::threading::Thread t2(Enqueue, numInsertsPerThread); // sprawl::threading::Thread t3(Enqueue, numInsertsPerThread * 2); // sprawl::threading::Thread t4(Enqueue, numInsertsPerThread * 3); // sprawl::threading::Thread t5(Enqueue, numInsertsPerThread * 4); // sprawl::threading::Thread t6(Enqueue, numInsertsPerThread * 5); // sprawl::threading::Thread t7(Enqueue, numInsertsPerThread * 6); // sprawl::threading::Thread t8(Enqueue, numInsertsPerThread * 7); // sprawl::threading::Thread t9(Enqueue, numInsertsPerThread * 8); // sprawl::threading::Thread t10(Enqueue, numInsertsPerThread * 9); // sprawl::threading::Thread t11(Dequeue); // sprawl::threading::Thread t12(Dequeue); // sprawl::threading::Thread t13(Dequeue); // sprawl::threading::Thread t14(Dequeue); // sprawl::threading::Thread t15(Dequeue); // sprawl::threading::Thread t16(Dequeue); // sprawl::threading::Thread t17(Dequeue); // sprawl::threading::Thread t18(Dequeue); // sprawl::threading::Thread t19(Dequeue); // sprawl::threading::Thread t20(Dequeue); // t1.Start(); // t11.Start(); // t2.Start(); // t12.Start(); // t3.Start(); // t13.Start(); // t4.Start(); // t14.Start(); // t5.Start(); // t15.Start(); // t6.Start(); // t16.Start(); // t7.Start(); // t17.Start(); // t8.Start(); // t18.Start(); // t9.Start(); // t19.Start(); // t10.Start(); // t20.Start(); // t1.Join(); // t11.Join(); // t2.Join(); // t12.Join(); // t3.Join(); // t13.Join(); // t4.Join(); // t14.Join(); // t5.Join(); // t15.Join(); // t6.Join(); // t16.Join(); // t7.Join(); // t17.Join(); // t8.Join(); // t18.Join(); // t9.Join(); // t19.Join(); // t10.Join(); // t20.Join(); // for(int i = 0; i < numInsertsPerThread * 10; ++i) // { // ASSERT_TRUE(results.Has(i)); // ASSERT_EQ(1, results.Get(i)); // } // int i; // ASSERT_FALSE(queue.Dequeue(i)); // } sprawl::collections::Deque<int> deque(sprawl::collections::Capacity(numInsertsPerThread * 10)); void EnqueueDeque(int startingPoint) { for(int i = startingPoint; i < startingPoint + numInsertsPerThread; ++i) { sprawl::threading::ScopedLock lock(mtx); deque.PushBack(i); } } int countDeque = 0; void DequeueDeque() { while(countDeque < numInsertsPerThread * 10) { sprawl::threading::ScopedLock lock(mtx); if(!deque.Empty()) { deque.PopBack(); ++countDeque; } } } TEST(ConcurrentQueue, FasterThanQueueWithLocks) { sprawl::threading::Thread t1(Enqueue, 0); sprawl::threading::Thread t2(Enqueue, numInsertsPerThread); sprawl::threading::Thread t3(Enqueue, numInsertsPerThread * 2); sprawl::threading::Thread t4(Enqueue, numInsertsPerThread * 3); sprawl::threading::Thread t5(Enqueue, numInsertsPerThread * 4); sprawl::threading::Thread t6(Enqueue, numInsertsPerThread * 5); sprawl::threading::Thread t7(Enqueue, numInsertsPerThread * 6); sprawl::threading::Thread t8(Enqueue, numInsertsPerThread * 7); sprawl::threading::Thread t9(Enqueue, numInsertsPerThread * 8); sprawl::threading::Thread t10(Enqueue, numInsertsPerThread * 9); sprawl::threading::Thread t11(DequeueSimple); sprawl::threading::Thread t12(DequeueSimple); sprawl::threading::Thread t13(DequeueSimple); sprawl::threading::Thread t14(DequeueSimple); sprawl::threading::Thread t15(DequeueSimple); sprawl::threading::Thread t16(DequeueSimple); sprawl::threading::Thread t17(DequeueSimple); sprawl::threading::Thread t18(DequeueSimple); sprawl::threading::Thread t19(DequeueSimple); sprawl::threading::Thread t20(DequeueSimple); int64_t lockFreeStart = sprawl::time::Now(sprawl::time::Resolution::Milliseconds); t1.Start(); t11.Start(); t2.Start(); t12.Start(); t3.Start(); t13.Start(); t4.Start(); t14.Start(); t5.Start(); t15.Start(); t6.Start(); t16.Start(); t7.Start(); t17.Start(); t8.Start(); t18.Start(); t9.Start(); t19.Start(); t10.Start(); t20.Start(); t1.Join(); t11.Join(); t2.Join(); t12.Join(); t3.Join(); t13.Join(); t4.Join(); t14.Join(); t5.Join(); t15.Join(); t6.Join(); t16.Join(); t7.Join(); t17.Join(); t8.Join(); t18.Join(); t9.Join(); t19.Join(); t10.Join(); t20.Join(); int64_t lockFreeTime = sprawl::time::Now(sprawl::time::Resolution::Milliseconds) - lockFreeStart; sprawl::threading::Thread t21(EnqueueDeque, 0); sprawl::threading::Thread t22(EnqueueDeque, numInsertsPerThread); sprawl::threading::Thread t23(EnqueueDeque, numInsertsPerThread * 2); sprawl::threading::Thread t24(EnqueueDeque, numInsertsPerThread * 3); sprawl::threading::Thread t25(EnqueueDeque, numInsertsPerThread * 4); sprawl::threading::Thread t26(EnqueueDeque, numInsertsPerThread * 5); sprawl::threading::Thread t27(EnqueueDeque, numInsertsPerThread * 6); sprawl::threading::Thread t28(EnqueueDeque, numInsertsPerThread * 7); sprawl::threading::Thread t29(EnqueueDeque, numInsertsPerThread * 8); sprawl::threading::Thread t30(EnqueueDeque, numInsertsPerThread * 9); sprawl::threading::Thread t31(DequeueDeque); sprawl::threading::Thread t32(DequeueDeque); sprawl::threading::Thread t33(DequeueDeque); sprawl::threading::Thread t34(DequeueDeque); sprawl::threading::Thread t35(DequeueDeque); sprawl::threading::Thread t36(DequeueDeque); sprawl::threading::Thread t37(DequeueDeque); sprawl::threading::Thread t38(DequeueDeque); sprawl::threading::Thread t39(DequeueDeque); sprawl::threading::Thread t40(DequeueDeque); int64_t dequeStart = sprawl::time::Now(sprawl::time::Resolution::Milliseconds); t21.Start(); t31.Start(); t22.Start(); t32.Start(); t23.Start(); t33.Start(); t24.Start(); t34.Start(); t25.Start(); t35.Start(); t26.Start(); t36.Start(); t27.Start(); t37.Start(); t28.Start(); t38.Start(); t29.Start(); t39.Start(); t30.Start(); t40.Start(); t21.Join(); t31.Join(); t22.Join(); t32.Join(); t23.Join(); t33.Join(); t24.Join(); t34.Join(); t25.Join(); t35.Join(); t26.Join(); t36.Join(); t27.Join(); t37.Join(); t28.Join(); t38.Join(); t29.Join(); t39.Join(); t30.Join(); t40.Join(); int64_t dequeTime = sprawl::time::Now(sprawl::time::Resolution::Milliseconds) - dequeStart; #if defined(_WIN32) #define I64FMT "ll" #elif defined(__APPLE__) #define I64FMT "ll" #else #define I64FMT "l" #endif printf("Lock free: %" I64FMT "d ms, deque: %" I64FMT "d ms\n", lockFreeTime, dequeTime ); EXPECT_LT(lockFreeTime, dequeTime); // for(int i = 0; i < numInsertsPerThread * 10; ++i) // { // int val; // ASSERT_TRUE(queue.Dequeue(val)) << "Failed to dequeue item " << i; // } // int i; // ASSERT_FALSE(queue.Dequeue(i)); } template<typename T> class mpmc_bounded_queue { public: mpmc_bounded_queue(size_t buffer_size) : buffer_(new cell_t [buffer_size]) , buffer_mask_(buffer_size - 1) { assert((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)); for (size_t i = 0; i != buffer_size; i += 1) buffer_[i].sequence_.store(i, std::memory_order_relaxed); enqueue_pos_.store(0, std::memory_order_relaxed); dequeue_pos_.store(0, std::memory_order_relaxed); } ~mpmc_bounded_queue() { delete [] buffer_; } bool enqueue(T const& data) { cell_t* cell; size_t pos = enqueue_pos_.load(std::memory_order_relaxed); for (;;) { cell = &buffer_[pos & buffer_mask_]; size_t seq = cell->sequence_.load(std::memory_order_acquire); intptr_t dif = (intptr_t)seq - (intptr_t)pos; if (dif == 0) { if (enqueue_pos_.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed)) break; } else if (dif < 0) return false; else pos = enqueue_pos_.load(std::memory_order_relaxed); } cell->data_ = data; cell->sequence_.store(pos + 1, std::memory_order_release); return true; } bool dequeue(T& data) { cell_t* cell; size_t pos = dequeue_pos_.load(std::memory_order_relaxed); for (;;) { cell = &buffer_[pos & buffer_mask_]; size_t seq = cell->sequence_.load(std::memory_order_acquire); intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); if (dif == 0) { if (dequeue_pos_.compare_exchange_weak (pos, pos + 1, std::memory_order_relaxed)) break; } else if (dif < 0) return false; else pos = dequeue_pos_.load(std::memory_order_relaxed); } data = cell->data_; cell->sequence_.store (pos + buffer_mask_ + 1, std::memory_order_release); return true; } private: struct cell_t { std::atomic<size_t> sequence_; T data_; }; static size_t const cacheline_size = 64; typedef char cacheline_pad_t [cacheline_size]; cacheline_pad_t pad0_; cell_t* const buffer_; size_t const buffer_mask_; cacheline_pad_t pad1_; std::atomic<size_t> enqueue_pos_; cacheline_pad_t pad2_; std::atomic<size_t> dequeue_pos_; cacheline_pad_t pad3_; mpmc_bounded_queue(mpmc_bounded_queue const&); void operator = (mpmc_bounded_queue const&); }; mpmc_bounded_queue<int> queue1024(pow(2, ceil(log(numInsertsPerThread * 10)/log(2)))); void Enqueue1024Cores(int startingPoint) { for(int i = startingPoint; i < startingPoint + numInsertsPerThread; ++i) { queue1024.enqueue(i); } } std::atomic<int> count1024(0); void Dequeue1024Cores() { while(count1024 < numInsertsPerThread * 10) { int i; if(queue1024.dequeue(i)) { ++count1024; } } } TEST(ConcurrentQueue, PerformanceComparesTo1024CoresQueue) { int test; ASSERT_FALSE(queue.Dequeue(test)) << "Queue did not start clean."; sprawl::threading::Thread t1(Enqueue, 0); sprawl::threading::Thread t2(Enqueue, numInsertsPerThread); sprawl::threading::Thread t3(Enqueue, numInsertsPerThread * 2); sprawl::threading::Thread t4(Enqueue, numInsertsPerThread * 3); sprawl::threading::Thread t5(Enqueue, numInsertsPerThread * 4); sprawl::threading::Thread t6(Enqueue, numInsertsPerThread * 5); sprawl::threading::Thread t7(Enqueue, numInsertsPerThread * 6); sprawl::threading::Thread t8(Enqueue, numInsertsPerThread * 7); sprawl::threading::Thread t9(Enqueue, numInsertsPerThread * 8); sprawl::threading::Thread t10(Enqueue, numInsertsPerThread * 9); sprawl::threading::Thread t11(DequeueSimple); sprawl::threading::Thread t12(DequeueSimple); sprawl::threading::Thread t13(DequeueSimple); sprawl::threading::Thread t14(DequeueSimple); sprawl::threading::Thread t15(DequeueSimple); sprawl::threading::Thread t16(DequeueSimple); sprawl::threading::Thread t17(DequeueSimple); sprawl::threading::Thread t18(DequeueSimple); sprawl::threading::Thread t19(DequeueSimple); sprawl::threading::Thread t20(DequeueSimple); int64_t lockFreeStart = sprawl::time::Now(sprawl::time::Resolution::Milliseconds); t1.Start(); t11.Start(); t2.Start(); t12.Start(); t3.Start(); t13.Start(); t4.Start(); t14.Start(); t5.Start(); t15.Start(); t6.Start(); t16.Start(); t7.Start(); t17.Start(); t8.Start(); t18.Start(); t9.Start(); t19.Start(); t10.Start(); t20.Start(); t1.Join(); t11.Join(); t2.Join(); t12.Join(); t3.Join(); t13.Join(); t4.Join(); t14.Join(); t5.Join(); t15.Join(); t6.Join(); t16.Join(); t7.Join(); t17.Join(); t8.Join(); t18.Join(); t9.Join(); t19.Join(); t10.Join(); t20.Join(); int64_t lockFreeTime = sprawl::time::Now(sprawl::time::Resolution::Milliseconds) - lockFreeStart; sprawl::threading::Thread t21(Enqueue1024Cores, 0); sprawl::threading::Thread t22(Enqueue1024Cores, numInsertsPerThread); sprawl::threading::Thread t23(Enqueue1024Cores, numInsertsPerThread * 2); sprawl::threading::Thread t24(Enqueue1024Cores, numInsertsPerThread * 3); sprawl::threading::Thread t25(Enqueue1024Cores, numInsertsPerThread * 4); sprawl::threading::Thread t26(Enqueue1024Cores, numInsertsPerThread * 5); sprawl::threading::Thread t27(Enqueue1024Cores, numInsertsPerThread * 6); sprawl::threading::Thread t28(Enqueue1024Cores, numInsertsPerThread * 7); sprawl::threading::Thread t29(Enqueue1024Cores, numInsertsPerThread * 8); sprawl::threading::Thread t30(Enqueue1024Cores, numInsertsPerThread * 9); sprawl::threading::Thread t31(Dequeue1024Cores); sprawl::threading::Thread t32(Dequeue1024Cores); sprawl::threading::Thread t33(Dequeue1024Cores); sprawl::threading::Thread t34(Dequeue1024Cores); sprawl::threading::Thread t35(Dequeue1024Cores); sprawl::threading::Thread t36(Dequeue1024Cores); sprawl::threading::Thread t37(Dequeue1024Cores); sprawl::threading::Thread t38(Dequeue1024Cores); sprawl::threading::Thread t39(Dequeue1024Cores); sprawl::threading::Thread t40(Dequeue1024Cores); int64_t start1024 = sprawl::time::Now(sprawl::time::Resolution::Milliseconds); t21.Start(); t31.Start(); t22.Start(); t32.Start(); t23.Start(); t33.Start(); t24.Start(); t34.Start(); t25.Start(); t35.Start(); t26.Start(); t36.Start(); t27.Start(); t37.Start(); t28.Start(); t38.Start(); t29.Start(); t39.Start(); t30.Start(); t40.Start(); t21.Join(); t31.Join(); t22.Join(); t32.Join(); t23.Join(); t33.Join(); t24.Join(); t34.Join(); t25.Join(); t35.Join(); t26.Join(); t36.Join(); t27.Join(); t37.Join(); t28.Join(); t38.Join(); t29.Join(); t39.Join(); t30.Join(); t40.Join(); int64_t time1024 = sprawl::time::Now(sprawl::time::Resolution::Milliseconds) - start1024; #if defined(_WIN32) #define I64FMT "ll" #elif defined(__APPLE__) #define I64FMT "ll" #else #define I64FMT "l" #endif printf("Lock free: %" I64FMT "d ms, 1024 cores queue: %" I64FMT "d ms\n", lockFreeTime, time1024 ); ASSERT_LT(lockFreeTime, time1024); // for(int i = 0; i < numInsertsPerThread * 10; ++i) // { // int val; // ASSERT_TRUE(queue.Dequeue(val)) << "Failed to dequeue item " << i; // } // int i; // ASSERT_FALSE(queue.Dequeue(i)) << "Item " << i << " was still in queue?"; } const int numSlowInsertsPerThread = 1000; sprawl::collections::ConcurrentQueue<int, 10>* queueSlow = new sprawl::collections::ConcurrentQueue<int, 10>(); std::atomic<int> totalResults(0); void EnqueueSlow(int startingPoint) { for(int i = startingPoint; i < startingPoint + numSlowInsertsPerThread; ++i) { queueSlow->Enqueue(i); sprawl::this_thread::Sleep(100); } } void DequeueSlow() { sprawl::collections::BasicHashMap<int, int> localResults; while(totalResults < numSlowInsertsPerThread * 10) { int i; if(!queueSlow->Dequeue(i)) { sprawl::this_thread::Sleep(10); continue; } ++localResults[i]; ++totalResults; } sprawl::threading::ScopedLock lock(mtx); for(auto& kvp : localResults) { results[kvp.Key()] += kvp.Value(); } } TEST(ConcurrentQueue, WorksWhenDequeueFasterThanEnqueue) { results.Clear(); sprawl::threading::Thread t1(EnqueueSlow, 0); sprawl::threading::Thread t2(EnqueueSlow, numSlowInsertsPerThread); sprawl::threading::Thread t3(EnqueueSlow, numSlowInsertsPerThread * 2); sprawl::threading::Thread t4(EnqueueSlow, numSlowInsertsPerThread * 3); sprawl::threading::Thread t5(EnqueueSlow, numSlowInsertsPerThread * 4); sprawl::threading::Thread t6(EnqueueSlow, numSlowInsertsPerThread * 5); sprawl::threading::Thread t7(EnqueueSlow, numSlowInsertsPerThread * 6); sprawl::threading::Thread t8(EnqueueSlow, numSlowInsertsPerThread * 7); sprawl::threading::Thread t9(EnqueueSlow, numSlowInsertsPerThread * 8); sprawl::threading::Thread t10(EnqueueSlow, numSlowInsertsPerThread * 9); sprawl::threading::Thread t11(DequeueSlow); sprawl::threading::Thread t12(DequeueSlow); sprawl::threading::Thread t13(DequeueSlow); sprawl::threading::Thread t14(DequeueSlow); sprawl::threading::Thread t15(DequeueSlow); sprawl::threading::Thread t16(DequeueSlow); sprawl::threading::Thread t17(DequeueSlow); sprawl::threading::Thread t18(DequeueSlow); sprawl::threading::Thread t19(DequeueSlow); sprawl::threading::Thread t20(DequeueSlow); t1.Start(); t11.Start(); t2.Start(); t12.Start(); t3.Start(); t13.Start(); t4.Start(); t14.Start(); t5.Start(); t15.Start(); t6.Start(); t16.Start(); t7.Start(); t17.Start(); t8.Start(); t18.Start(); t9.Start(); t19.Start(); t10.Start(); t20.Start(); t1.Join(); t11.Join(); t2.Join(); t12.Join(); t3.Join(); t13.Join(); t4.Join(); t14.Join(); t5.Join(); t15.Join(); t6.Join(); t16.Join(); t7.Join(); t17.Join(); t8.Join(); t18.Join(); t9.Join(); t19.Join(); t10.Join(); t20.Join(); for(int i = 0; i < numSlowInsertsPerThread * 10; ++i) { EXPECT_TRUE(results.Has(i)) << "Item " << i << " is missing."; if(results.Has(i)) ASSERT_EQ(1, results.Get(i)) << "Item " << i << " dequeued more than once."; } int i; ASSERT_FALSE(queueSlow->Dequeue(i)) << "Item " << i << " was still in the queue?"; results.Clear(); delete queueSlow; } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 18645 | brandon_m_bare | Integrated latest version of libsprawl. | ||
//guest/ShadauxCat/Sprawl/Mainline/UnitTests/UnitTests_ConcurrentQueue.cpp | |||||
#1 | 16768 | ShadauxCat |
Improvements to error handling in builds with exceptions disabled: - In debug builds or with SPRAWL_ERRORSTATE_STRICT enabled, ErrorState will output a message to stderr and terminate if Get() is called when an error flag is set. (In release buils or with SPRAWL_ERRORSTATE_PERMISSIVE defined, Get() will return junk memory in this case.) - In debug builds or with SPRAWL_ERRORSTATE_STRICT enabled, ErrorState will output a message to stderr and terminate if its destructor is called without checking the errorstate if an error is present (equivalent to an exception terminating the application if no catch() block is present for it). - On linux builds and when running "Analyze" through visual studio, a warning will be issued if any function returning ErrorState has its return value ignored. (This only applies to builds with exceptions not enabled; when exceptions are enabled no warning is issued) - Many functions that could return ErrorState were having their return values silently ignored in internal sprawl code so the user would not find out about errors if exceptions are disabled; now anything in sprawl code that calls a function returning ErrorState will either handle the error, or (in most cases) surface it back up to the user. - As a positive side-effect of the warnings for ignoring ErrorState, several constructors that were capable of throwing exceptions are no longer capable of doing so. #review-16769 |