diff options
author | Josh Blum | 2012-10-28 11:40:59 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-28 11:40:59 -0700 |
commit | cbcedb10e8f59ac696478460c21150b811e8d083 (patch) | |
tree | 81fdae1d565fad7c8232a39538dee7630aa131d4 | |
parent | 25ec793e687821bd2ac5862fcf934c4c9c0f47b6 (diff) | |
parent | ebd812fd4702cbe5c9ef487f6efc8a4e0eb165e9 (diff) | |
download | sandhi-cbcedb10e8f59ac696478460c21150b811e8d083.tar.gz sandhi-cbcedb10e8f59ac696478460c21150b811e8d083.tar.bz2 sandhi-cbcedb10e8f59ac696478460c21150b811e8d083.zip |
Merge branch 'rc0_work'
Conflicts:
Apology
m--------- | Apology | 0 | ||||
m--------- | Theron | 0 | ||||
-rw-r--r-- | include/gnuradio/thread_pool.hpp | 41 | ||||
-rw-r--r-- | lib/CMakeLists.txt | 5 | ||||
-rw-r--r-- | lib/block_actor.cpp | 38 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 27 | ||||
-rw-r--r-- | lib/register_messages.cpp | 39 | ||||
-rw-r--r-- | lib/theron_allocator.cpp | 6 |
8 files changed, 103 insertions, 53 deletions
diff --git a/Apology b/Apology -Subproject 7fabc009441ef1bb7396e71d927c02ce0e86276 +Subproject ed35a35f10610514dec4a81d66e27d804562c90 diff --git a/Theron b/Theron -Subproject a18a310781e736ccc3272a18c810275b24e27e9 +Subproject 93289940a146bf0338ed5a48a0bd19b17c2b339 diff --git a/include/gnuradio/thread_pool.hpp b/include/gnuradio/thread_pool.hpp index c0adc1c..d34c161 100644 --- a/include/gnuradio/thread_pool.hpp +++ b/include/gnuradio/thread_pool.hpp @@ -20,6 +20,7 @@ #include <gnuradio/gras.hpp> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> +#include <string> //! ThreadPool is an unexposed Theron Framework //! Forward declare the Framwork for c++ users @@ -31,6 +32,38 @@ namespace Theron namespace gnuradio { +struct GRAS_API ThreadPoolConfig +{ + ThreadPoolConfig(void); + + /*! + * The initial number of worker threads to create within the framework. + * Default is the number of CPUs on the system. + */ + size_t thread_count; + + /*! + * Specifies the NUMA processor nodes upon which the framework may execute. + * Default is all NUMA nodes on the system. + */ + size_t node_mask; + + /*! + * Specifies the subset of the processors in each NUMA processor node upon which the framework may execute. + * Default is all CPUs per NUMA node. + */ + size_t processor_mask; + + /*! + * Yield strategy employed by the worker threads in the framework. + * POLITE, ///< Threads go to sleep when not in use. + * STRONG, ///< Threads yield to other threads but don't go to sleep. + * AGGRESSIVE ///< Threads never yield to other threads. + * Default is STRONG. + */ + std::string yield_strategy; +}; + /*! * Thread Pool is is a this wrapper of Theron Framework, see link for more details: * http://docs.theron-library.com/5.00/structTheron_1_1Framework_1_1Parameters.html @@ -44,13 +77,7 @@ struct GRAS_API ThreadPool : boost::shared_ptr<Theron::Framework> ThreadPool(boost::weak_ptr<Theron::Framework> p); //! Create a new thread pool with parameters - ThreadPool(const unsigned long threadCount); - - //! Create a new thread pool with parameters - ThreadPool(const unsigned long threadCount, const unsigned long nodeMask); - - //! Create a new thread pool with parameters - ThreadPool(const unsigned long threadCount, const unsigned long nodeMask, const unsigned long processorMask); + ThreadPool(const ThreadPoolConfig &config); /*! * When a block is created, it will execute in the active pool. diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 6081bb6..131c00a 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -11,10 +11,6 @@ if(CMAKE_BUILD_TYPE STREQUAL "Debug") add_definitions(-DGRAS_DEBUG) endif() -list(APPEND gnuradio_core_sources - ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp #exports messages, must be first -) - ######################################################################## # Setup Theron Deps ######################################################################## @@ -58,6 +54,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp ) #sources that are in tree that have not changed diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index e522624..108e7d3 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -21,6 +21,15 @@ using namespace gnuradio; +ThreadPoolConfig::ThreadPoolConfig(void) +{ + thread_count = boost::thread::hardware_concurrency(); + thread_count = std::max(size_t(2), thread_count); + node_mask = 0; + processor_mask = 0xffffffff; + yield_strategy = "STRONG"; +} + /*********************************************************************** * Thread pool implementation **********************************************************************/ @@ -35,26 +44,19 @@ ThreadPool::ThreadPool(boost::weak_ptr<Theron::Framework> p): //NOP } -const size_t default_concurrency(void) -{ - const size_t n = boost::thread::hardware_concurrency(); - return std::max(size_t(2), n); -} - -ThreadPool::ThreadPool(unsigned long threadCount) +ThreadPool::ThreadPool(const ThreadPoolConfig &config) { - if (threadCount == 0) threadCount = default_concurrency(); - this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, 0))); -} + Theron::Framework::Parameters params( + config.thread_count, + config.node_mask, + config.processor_mask + ); -ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask) -{ - this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask))); -} + if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::Framework::YIELD_STRATEGY_POLITE; + if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::Framework::YIELD_STRATEGY_STRONG; + if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::Framework::YIELD_STRATEGY_AGGRESSIVE; -ThreadPool::ThreadPool(const unsigned long threadCount, const unsigned long nodeMask, const unsigned long processorMask) -{ - this->reset(new Theron::Framework(Theron::Framework::Parameters(threadCount, nodeMask, processorMask))); + this->reset(new Theron::Framework(Theron::Framework::Parameters(params))); } /*********************************************************************** @@ -73,7 +75,7 @@ static ThreadPool get_active_thread_pool(void) { if (not weak_framework.lock()) { - active_thread_pool = ThreadPool(0); + active_thread_pool = ThreadPool(ThreadPoolConfig()); active_thread_pool.set_active(); std::cout << "Created default thread pool with " << active_thread_pool->GetNumThreads() << " threads." << std::endl; } diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index ef87398..68c3392 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -139,4 +139,31 @@ struct UpdateInputsMessage } //namespace gnuradio +#include <Theron/Register.h> +#include <gnuradio/top_block.hpp> +#include <gras_impl/messages.hpp> +#include <gras_impl/interruptible_thread.hpp> + +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::TopAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::TopActiveMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::TopInertMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::TopTokenMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::GlobalBlockConfig); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::SharedThreadGroup); + +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::InputTagMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::InputBufferMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::InputTokenMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::InputCheckMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::InputAllocMessage); + +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::OutputBufferMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::OutputTokenMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::OutputCheckMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::OutputHintMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::OutputAllocMessage); + +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::SelfKickMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gnuradio::UpdateInputsMessage); + #endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/ diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index 7c894fa..4d774ea 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -14,31 +14,28 @@ // You should have received a copy of the GNU Lesser General Public License // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. -#include <Theron/Register.h> #include <gnuradio/top_block.hpp> #include <gras_impl/messages.hpp> #include <gras_impl/interruptible_thread.hpp> -#include <Apology/Worker.hpp> -THERON_REGISTER_MESSAGE(Apology::WorkerTopologyMessage); -THERON_REGISTER_MESSAGE(gnuradio::TopAllocMessage); -THERON_REGISTER_MESSAGE(gnuradio::TopActiveMessage); -THERON_REGISTER_MESSAGE(gnuradio::TopInertMessage); -THERON_REGISTER_MESSAGE(gnuradio::TopTokenMessage); -THERON_REGISTER_MESSAGE(gnuradio::GlobalBlockConfig); -THERON_REGISTER_MESSAGE(gnuradio::SharedThreadGroup); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::TopAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::TopActiveMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::TopInertMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::TopTokenMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::GlobalBlockConfig); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::SharedThreadGroup); -THERON_REGISTER_MESSAGE(gnuradio::InputTagMessage); -THERON_REGISTER_MESSAGE(gnuradio::InputBufferMessage); -THERON_REGISTER_MESSAGE(gnuradio::InputTokenMessage); -THERON_REGISTER_MESSAGE(gnuradio::InputCheckMessage); -THERON_REGISTER_MESSAGE(gnuradio::InputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::InputTagMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::InputBufferMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::InputTokenMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::InputCheckMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::InputAllocMessage); -THERON_REGISTER_MESSAGE(gnuradio::OutputBufferMessage); -THERON_REGISTER_MESSAGE(gnuradio::OutputTokenMessage); -THERON_REGISTER_MESSAGE(gnuradio::OutputCheckMessage); -THERON_REGISTER_MESSAGE(gnuradio::OutputHintMessage); -THERON_REGISTER_MESSAGE(gnuradio::OutputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::OutputBufferMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::OutputTokenMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::OutputCheckMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::OutputHintMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::OutputAllocMessage); -THERON_REGISTER_MESSAGE(gnuradio::SelfKickMessage); -THERON_REGISTER_MESSAGE(gnuradio::UpdateInputsMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::SelfKickMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gnuradio::UpdateInputsMessage); diff --git a/lib/theron_allocator.cpp b/lib/theron_allocator.cpp index 23e0fd6..3158159 100644 --- a/lib/theron_allocator.cpp +++ b/lib/theron_allocator.cpp @@ -29,7 +29,7 @@ #include <Theron/AllocatorManager.h> #include <boost/circular_buffer.hpp> -#define MY_ALLOCATOR_CHUNK_SIZE 96 //Theron asks for a lot of 88-byte buffers +#define MY_ALLOCATOR_CHUNK_SIZE 256 #define MY_ALLOCATOR_POOL_SIZE (MY_ALLOCATOR_CHUNK_SIZE * (1 << 16)) static struct WorkerAllocator : Theron::IAllocator @@ -71,7 +71,7 @@ static struct WorkerAllocator : Theron::IAllocator else { //std::cout << "malloc size " << size << std::endl; - return std::malloc(size); + return new char[size]; } } @@ -86,7 +86,7 @@ static struct WorkerAllocator : Theron::IAllocator } else { - std::free(memory); + delete [] ((char *)memory); } } |