summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-11-07 00:40:41 -0800
committerJosh Blum2012-11-07 00:40:41 -0800
commit0f0c68c4a8c32c43eee8f1589878abe376e2b1da (patch)
tree165014e95b5fa5fda8bcaf95bb7af6b2b99d7f02 /lib
parent12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d (diff)
downloadsandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.tar.gz
sandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.tar.bz2
sandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.zip
io fails and io config work
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp2
-rw-r--r--lib/block_task.cpp36
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp38
-rw-r--r--lib/topology_handler.cpp5
4 files changed, 62 insertions, 19 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 2235f72..578b03e 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -21,6 +21,8 @@ using namespace gras;
InputPortConfig::InputPortConfig(void)
{
+ reserve_items = 1;
+ maximum_items = 0;
inline_buffer = false;
lookahead_items = 0;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 8546731..cfe5bb0 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -73,6 +73,15 @@ void BlockActor::mark_done(void)
void BlockActor::input_fail(const size_t i)
{
+ SBuffer &buff = this->input_queues.front(i);
+
+ //check that the input is not already maxed
+ const size_t front_items = buff.length/this->input_items_sizes[i];
+ if (front_items >= this->input_configs[i].maximum_items)
+ {
+ //throw std::runtime_error("input_fail called on maximum_items buffer");
+ }
+
//input failed, accumulate and try again
if (not this->input_queues.is_accumulated(i))
{
@@ -80,15 +89,29 @@ void BlockActor::input_fail(const size_t i)
this->Push(SelfKickMessage(), Theron::Address());
return;
}
+
//otherwise check for done, else wait for more
if (this->inputs_done[i]) this->mark_done();
-
- //TODO check if input buffer is max size and throw
}
void BlockActor::output_fail(const size_t i)
{
- //TODO
+ SBuffer &buff = this->output_queues.front(i);
+
+ //check that the input is not already maxed
+ const size_t front_items = buff.length/this->output_items_sizes[i];
+ if (front_items >= this->output_configs[i].maximum_items)
+ {
+ throw std::runtime_error("output_fail called on maximum_items buffer");
+ }
+
+ if (buff.length != 0)
+ {
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
+ this->output_queues.pop(i);
+ }
}
void BlockActor::handle_task(void)
@@ -178,9 +201,14 @@ void BlockActor::handle_task(void)
{
if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
+ const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i];
//dont always pass output buffers downstream for the sake of efficiency
- if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
+ if (
+ not this->input_queues.all_ready() or
+ buff.length*2 > buff.get_actual_length() or
+ (buff.get_actual_length() - buff.length) < reserve_bytes
+ )
{
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 92bfb02..b851003 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -40,7 +40,7 @@ struct InputBufferQueues
this->resize(0);
}
- void update_history_bytes(const size_t i, const size_t hist_bytes);
+ void update_config(const size_t i, const size_t, const size_t, const size_t);
//! Call to get an input buffer for work
GRAS_FORCE_INLINE SBuffer &front(const size_t i)
@@ -114,11 +114,12 @@ struct InputBufferQueues
GRAS_FORCE_INLINE void __update(const size_t i)
{
- _bitset.set(i, _enqueued_bytes[i] != 0);
+ _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
}
BitSet _bitset;
std::vector<size_t> _enqueued_bytes;
+ std::vector<size_t> _reserve_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _history_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
@@ -129,24 +130,33 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
_enqueued_bytes.resize(size, 0);
+ _reserve_bytes.resize(size, 1);
_queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
_aux_queues.resize(size);
- for (size_t i = 0; i < this->size(); i++)
- {
- if (_aux_queues[i]) continue;
- _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
- _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
- _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
- _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
- _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
- }
-
}
-inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
+inline void InputBufferQueues::update_config(
+ const size_t i,
+ const size_t hist_bytes,
+ const size_t reserve_bytes,
+ size_t maximum_bytes
+)
{
+ //first allocate the aux buffer
+ if (maximum_bytes == 0) maximum_bytes = MAX_AUX_BUFF_BYTES;
+ if (
+ not _aux_queues[i] or
+ _aux_queues[i]->empty() or
+ _aux_queues[i]->front().get_actual_length() != maximum_bytes
+ ){
+ _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i]->allocate_one(maximum_bytes);
+ _aux_queues[i]->allocate_one(maximum_bytes);
+ _aux_queues[i]->allocate_one(maximum_bytes);
+ }
+
//there is history, so enqueue some initial history
if (hist_bytes > _history_bytes[i])
{
@@ -169,6 +179,8 @@ inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t
}
_history_bytes[i] = hist_bytes;
+ _reserve_bytes[i] = reserve_bytes;
+ this->__update(i);
}
GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 9261af1..9a1f243 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -102,10 +102,11 @@ void BlockActor::handle_update_inputs(
const size_t num_inputs = this->get_num_inputs();
this->input_queues.resize(num_inputs);
- //impose input reserve requirements based on relative rate and output multiple
for (size_t i = 0; i < num_inputs; i++)
{
const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items;
- this->input_queues.update_history_bytes(i, hist_bytes);
+ const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
+ const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
+ this->input_queues.update_config(i, hist_bytes, reserve_bytes, maximum_bytes);
}
}