diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_actor.cpp | 14 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 6 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 2 |
5 files changed, 19 insertions, 11 deletions
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp index 10c863e..750e52d 100644 --- a/lib/block_actor.cpp +++ b/lib/block_actor.cpp @@ -4,6 +4,7 @@ #include <gras_impl/block_actor.hpp> #include <boost/thread/thread.hpp> #include <Theron/Framework.h> +#include <stdexcept> using namespace gras; @@ -14,6 +15,10 @@ ThreadPoolConfig::ThreadPoolConfig(void) node_mask = 0; processor_mask = 0xffffffff; yield_strategy = "STRONG"; + + //environment variable override + const char * gras_yield = getenv("GRAS_YIELD"); + if (gras_yield != NULL) yield_strategy = gras_yield; } /*********************************************************************** @@ -38,9 +43,12 @@ ThreadPool::ThreadPool(const ThreadPoolConfig &config) config.processor_mask ); - if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::YIELD_STRATEGY_POLITE; - if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG; - if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::YIELD_STRATEGY_AGGRESSIVE; + if (config.yield_strategy.empty()) params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG; + //else if (config.yield_strategy == "BLOCKING") params.mYieldStrategy = Theron::YIELD_STRATEGY_BLOCKING; + else if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::YIELD_STRATEGY_POLITE; + else if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG; + else if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::YIELD_STRATEGY_AGGRESSIVE; + else throw std::runtime_error("gras::ThreadPoolConfig yield_strategy unknown: " + config.yield_strategy); this->reset(new Theron::Framework(Theron::Framework::Parameters(params))); } diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 3161653..8f42e57 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -7,9 +7,9 @@ using namespace gras; -const size_t AT_LEAST_BYTES = 1 << 14; -const size_t AHH_TOO_MANY_BYTES = 1 << 20; //TODO -const size_t THIS_MANY_BUFFERS = 16; +const size_t AT_LEAST_BYTES = 32*(1024); //kiB per buffer +const size_t AHH_TOO_MANY_BYTES = 32*(1024*1024); //MiB enough for me +const size_t THIS_MANY_BUFFERS = 8; //pool size void BlockActor::buffer_returner(const size_t index, SBuffer &buffer) { diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index c01e7d4..7934a1e 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -24,7 +24,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); //---------------------------------------------------------------------- //-- define to enable these debugs: //---------------------------------------------------------------------- -#define ASSERTING +//#define ASSERTING //#define MESSAGE_TRACING //#define ITEM_CONSPROD diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index da16ba4..faddda8 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -12,7 +12,7 @@ void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron:: const size_t index = message.index; //handle incoming stream tag, push into the tag storage - if (this->block_state == BLOCK_STATE_DONE) return; + if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; this->input_tags[index].push_back(message.tag); this->input_tags_changed[index] = true; } @@ -24,7 +24,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: const size_t index = message.index; //handle incoming async message, push into the msg storage - if (this->block_state == BLOCK_STATE_DONE) return; + if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; this->input_msgs[index].push_back(message.msg); this->update_input_avail(index); @@ -39,7 +39,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th const size_t index = message.index; //handle incoming stream buffer, push into the queue - if (this->block_state == BLOCK_STATE_DONE) return; + if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; this->input_queues.push(index, message.buffer); this->update_input_avail(index); diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index 3c2f971..d937bbf 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -14,7 +14,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const //a buffer has returned from the downstream //(all interested consumers have finished with it) - if (this->block_state == BLOCK_STATE_DONE) return; + if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; this->output_queues.push(index, message.buffer); ta.done(); |