diff options
author | Josh Blum | 2012-11-25 01:35:32 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-25 01:35:32 -0800 |
commit | f31ffe88efd4ef2e6f262fbda39dba44d3bb0925 (patch) | |
tree | a35d25766bbd42df43a8aa78e566bebcb03027ed /lib | |
parent | c5162cd3e2f21888b82ec6d4231ccef1d4b39e30 (diff) | |
download | sandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.tar.gz sandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.tar.bz2 sandhi-f31ffe88efd4ef2e6f262fbda39dba44d3bb0925.zip |
front buffer accumulate on reserve fail
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_task.cpp | 3 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 25 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 2 |
3 files changed, 23 insertions, 7 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 8b48b61..c3e93d0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -62,7 +62,7 @@ void BlockActor::input_fail(const size_t i) //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) { - this->input_queues.accumulate(i, this->input_items_sizes[i]); + this->input_queues.accumulate(i); this->Push(SelfKickMessage(), Theron::Address()); return; } @@ -143,7 +143,6 @@ void BlockActor::handle_task(void) this->sort_tags(i); ASSERT(this->input_queues.ready(i)); - //this->input_queues.accumulate(i, this->input_items_sizes[i]); const SBuffer &buff = this->input_queues.front(i); const void *mem = buff.get(); size_t items = buff.length/this->input_items_sizes[i]; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index adc4edd..aa097ff 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -42,7 +42,7 @@ struct InputBufferQueues this->resize(0); } - void update_config(const size_t i, const size_t, const size_t, const size_t); + void update_config(const size_t i, const size_t, const size_t, const size_t, const size_t); //! Call to get an input buffer for work GRAS_FORCE_INLINE const SBuffer &front(const size_t i) @@ -52,6 +52,13 @@ struct InputBufferQueues //special case when the null buffer is possible if (_queues[i].empty()) return get_null_buff(); + //there are enough enqueued bytes, but not in the front buffer + if (_queues[i].front().length < _reserve_bytes[i]) + { + this->accumulate(i); + } + ASSERT(_queues[i].front().length >= _reserve_bytes[i]); + return _queues[i].front(); } @@ -60,7 +67,7 @@ struct InputBufferQueues void resize(const size_t size); - void accumulate(const size_t i, const size_t item_size); + void accumulate(const size_t i); /*! * Can we consider this queue's buffers to be accumulated? @@ -140,6 +147,7 @@ struct InputBufferQueues } BitSet _bitset; + std::vector<size_t> _items_sizes; std::vector<size_t> _enqueued_bytes; std::vector<size_t> _reserve_bytes; std::vector<size_t> _maximum_bytes; @@ -152,6 +160,7 @@ struct InputBufferQueues GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) { _bitset.resize(size); + _items_sizes.resize(size, 0); _enqueued_bytes.resize(size, 0); _reserve_bytes.resize(size, 1); _maximum_bytes.resize(size, MAX_AUX_BUFF_BYTES); @@ -163,11 +172,14 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) inline void InputBufferQueues::update_config( const size_t i, + const size_t item_size, const size_t hist_bytes, const size_t reserve_bytes, const size_t maximum_bytes ) { + _items_sizes[i] = item_size; + //first allocate the aux buffer if (maximum_bytes != 0) _maximum_bytes[i] = maximum_bytes; _maximum_bytes[i] = std::max(_maximum_bytes[i], reserve_bytes); @@ -208,16 +220,21 @@ inline void InputBufferQueues::update_config( this->__update(i); } -GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) +GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i) { + if (_aux_queues[i]->empty()) + { + _aux_queues[i]->allocate_one(_maximum_bytes[i]); + } ASSERT(not _aux_queues[i]->empty()); + SBuffer accum_buff = _aux_queues[i]->front(); _aux_queues[i]->pop(); accum_buff.offset = 0; accum_buff.length = 0; size_t free_bytes = accum_buff.get_actual_length(); - free_bytes /= item_size; free_bytes *= item_size; + free_bytes /= _items_sizes[i]; free_bytes *= _items_sizes[i]; while (not _queues[i].empty() and free_bytes != 0) { diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 5cd2f79..df8d445 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -95,6 +95,6 @@ void BlockActor::handle_update_inputs( const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items; const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, hist_bytes, reserve_bytes, maximum_bytes); + this->input_queues.update_config(i, this->input_items_sizes[i], hist_bytes, reserve_bytes, maximum_bytes); } } |