summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-11-25 01:35:32 -0800
committerJosh Blum2012-11-25 01:35:32 -0800
commitf31ffe88efd4ef2e6f262fbda39dba44d3bb0925 (patch)
treea35d25766bbd42df43a8aa78e566bebcb03027ed /lib
parentc5162cd3e2f21888b82ec6d4231ccef1d4b39e30 (diff)
downloadsandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.tar.gz
sandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.tar.bz2
sandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.zip
front buffer accumulate on reserve fail
Diffstat (limited to 'lib')
-rw-r--r--lib/block_task.cpp3
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp25
-rw-r--r--lib/topology_handler.cpp2
3 files changed, 23 insertions, 7 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 8b48b61..c3e93d0 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -62,7 +62,7 @@ void BlockActor::input_fail(const size_t i)
//input failed, accumulate and try again
if (not this->input_queues.is_accumulated(i))
{
- this->input_queues.accumulate(i, this->input_items_sizes[i]);
+ this->input_queues.accumulate(i);
this->Push(SelfKickMessage(), Theron::Address());
return;
}
@@ -143,7 +143,6 @@ void BlockActor::handle_task(void)
this->sort_tags(i);
ASSERT(this->input_queues.ready(i));
- //this->input_queues.accumulate(i, this->input_items_sizes[i]);
const SBuffer &buff = this->input_queues.front(i);
const void *mem = buff.get();
size_t items = buff.length/this->input_items_sizes[i];
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index adc4edd..aa097ff 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -42,7 +42,7 @@ struct InputBufferQueues
this->resize(0);
}
- void update_config(const size_t i, const size_t, const size_t, const size_t);
+ void update_config(const size_t i, const size_t, const size_t, const size_t, const size_t);
//! Call to get an input buffer for work
GRAS_FORCE_INLINE const SBuffer &front(const size_t i)
@@ -52,6 +52,13 @@ struct InputBufferQueues
//special case when the null buffer is possible
if (_queues[i].empty()) return get_null_buff();
+ //there are enough enqueued bytes, but not in the front buffer
+ if (_queues[i].front().length < _reserve_bytes[i])
+ {
+ this->accumulate(i);
+ }
+ ASSERT(_queues[i].front().length >= _reserve_bytes[i]);
+
return _queues[i].front();
}
@@ -60,7 +67,7 @@ struct InputBufferQueues
void resize(const size_t size);
- void accumulate(const size_t i, const size_t item_size);
+ void accumulate(const size_t i);
/*!
* Can we consider this queue's buffers to be accumulated?
@@ -140,6 +147,7 @@ struct InputBufferQueues
}
BitSet _bitset;
+ std::vector<size_t> _items_sizes;
std::vector<size_t> _enqueued_bytes;
std::vector<size_t> _reserve_bytes;
std::vector<size_t> _maximum_bytes;
@@ -152,6 +160,7 @@ struct InputBufferQueues
GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
{
_bitset.resize(size);
+ _items_sizes.resize(size, 0);
_enqueued_bytes.resize(size, 0);
_reserve_bytes.resize(size, 1);
_maximum_bytes.resize(size, MAX_AUX_BUFF_BYTES);
@@ -163,11 +172,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
inline void InputBufferQueues::update_config(
const size_t i,
+ const size_t item_size,
const size_t hist_bytes,
const size_t reserve_bytes,
const size_t maximum_bytes
)
{
+ _items_sizes[i] = item_size;
+
//first allocate the aux buffer
if (maximum_bytes != 0) _maximum_bytes[i] = maximum_bytes;
_maximum_bytes[i] = std::max(_maximum_bytes[i], reserve_bytes);
@@ -208,16 +220,21 @@ inline void InputBufferQueues::update_config(
this->__update(i);
}
-GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
+GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i)
{
+ if (_aux_queues[i]->empty())
+ {
+ _aux_queues[i]->allocate_one(_maximum_bytes[i]);
+ }
ASSERT(not _aux_queues[i]->empty());
+
SBuffer accum_buff = _aux_queues[i]->front();
_aux_queues[i]->pop();
accum_buff.offset = 0;
accum_buff.length = 0;
size_t free_bytes = accum_buff.get_actual_length();
- free_bytes /= item_size; free_bytes *= item_size;
+ free_bytes /= _items_sizes[i]; free_bytes *= _items_sizes[i];
while (not _queues[i].empty() and free_bytes != 0)
{
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 5cd2f79..df8d445 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -95,6 +95,6 @@ void BlockActor::handle_update_inputs(
const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items;
const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, hist_bytes, reserve_bytes, maximum_bytes);
+ this->input_queues.update_config(i, this->input_items_sizes[i], hist_bytes, reserve_bytes, maximum_bytes);
}
}