summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block_actor.cpp14
-rw-r--r--lib/block_allocator.cpp6
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/input_handlers.cpp6
-rw-r--r--lib/output_handlers.cpp2
5 files changed, 19 insertions, 11 deletions
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 10c863e..750e52d 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -4,6 +4,7 @@
#include <gras_impl/block_actor.hpp>
#include <boost/thread/thread.hpp>
#include <Theron/Framework.h>
+#include <stdexcept>
using namespace gras;
@@ -14,6 +15,10 @@ ThreadPoolConfig::ThreadPoolConfig(void)
node_mask = 0;
processor_mask = 0xffffffff;
yield_strategy = "STRONG";
+
+ //environment variable override
+ const char * gras_yield = getenv("GRAS_YIELD");
+ if (gras_yield != NULL) yield_strategy = gras_yield;
}
/***********************************************************************
@@ -38,9 +43,12 @@ ThreadPool::ThreadPool(const ThreadPoolConfig &config)
config.processor_mask
);
- if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::YIELD_STRATEGY_POLITE;
- if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG;
- if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::YIELD_STRATEGY_AGGRESSIVE;
+ if (config.yield_strategy.empty()) params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG;
+ //else if (config.yield_strategy == "BLOCKING") params.mYieldStrategy = Theron::YIELD_STRATEGY_BLOCKING;
+ else if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::YIELD_STRATEGY_POLITE;
+ else if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG;
+ else if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::YIELD_STRATEGY_AGGRESSIVE;
+ else throw std::runtime_error("gras::ThreadPoolConfig yield_strategy unknown: " + config.yield_strategy);
this->reset(new Theron::Framework(Theron::Framework::Parameters(params)));
}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 3161653..8f42e57 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -7,9 +7,9 @@
using namespace gras;
-const size_t AT_LEAST_BYTES = 1 << 14;
-const size_t AHH_TOO_MANY_BYTES = 1 << 20; //TODO
-const size_t THIS_MANY_BUFFERS = 16;
+const size_t AT_LEAST_BYTES = 32*(1024); //kiB per buffer
+const size_t AHH_TOO_MANY_BYTES = 32*(1024*1024); //MiB enough for me
+const size_t THIS_MANY_BUFFERS = 8; //pool size
void BlockActor::buffer_returner(const size_t index, SBuffer &buffer)
{
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index c01e7d4..7934a1e 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -24,7 +24,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
//----------------------------------------------------------------------
//-- define to enable these debugs:
//----------------------------------------------------------------------
-#define ASSERTING
+//#define ASSERTING
//#define MESSAGE_TRACING
//#define ITEM_CONSPROD
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index da16ba4..faddda8 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -12,7 +12,7 @@ void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::
const size_t index = message.index;
//handle incoming stream tag, push into the tag storage
- if (this->block_state == BLOCK_STATE_DONE) return;
+ if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
this->input_tags[index].push_back(message.tag);
this->input_tags_changed[index] = true;
}
@@ -24,7 +24,7 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
const size_t index = message.index;
//handle incoming async message, push into the msg storage
- if (this->block_state == BLOCK_STATE_DONE) return;
+ if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
this->input_msgs[index].push_back(message.msg);
this->update_input_avail(index);
@@ -39,7 +39,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
const size_t index = message.index;
//handle incoming stream buffer, push into the queue
- if (this->block_state == BLOCK_STATE_DONE) return;
+ if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
this->input_queues.push(index, message.buffer);
this->update_input_avail(index);
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index 3c2f971..d937bbf 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -14,7 +14,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
//a buffer has returned from the downstream
//(all interested consumers have finished with it)
- if (this->block_state == BLOCK_STATE_DONE) return;
+ if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
this->output_queues.push(index, message.buffer);
ta.done();