summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-02 20:01:21 -0700
committerJosh Blum2012-09-02 20:01:21 -0700
commit31a786ff101ef66cf9a344df0efc3cb01db59da5 (patch)
treed0af1f8646d5448898a25a4cbac110fb78f7bbfe /lib
parentfb5c7144a4e9145612fd610fe5d9d2ede7697cc7 (diff)
downloadsandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.tar.gz
sandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.tar.bz2
sandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.zip
history: output offset and holds for input consumption
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp1
-rw-r--r--lib/block_task.cpp24
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp6
4 files changed, 24 insertions, 8 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index be53fe6..1a48ffb 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -115,6 +115,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->produce_items.resize(num_outputs, 0);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
+ this->output_bytes_offset.resize(num_outputs, 0);
this->input_tokens.resize(num_inputs);
this->output_tokens.resize(num_outputs);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 59be851..e436bc1 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -120,8 +120,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
ASSERT(this->output_queues.ready(i));
const tsbe::Buffer &buff = this->output_queues.front(i);
- char *mem = ((char *)buff.get_memory());
- const size_t bytes = buff.get_length();
+ char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i];
+ const size_t bytes = buff.get_length() - this->output_bytes_offset[i];
const size_t items = bytes/this->output_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
@@ -162,6 +162,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
+ bool input_fully_consumed = true;
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
@@ -170,7 +171,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- this->input_queues.pop(i, bytes);
+ input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes);
}
//------------------------------------------------------------------
@@ -183,10 +184,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_produced[i] += items;
const size_t bytes = items*this->output_items_sizes[i];
- tsbe::Buffer &buff = this->output_queues.front(i);
- buff.get_length() = bytes;
- task_iface.post_downstream(i, buff);
- this->output_queues.pop(i);
+ this->output_bytes_offset[i] += bytes;
+
+ //only pass output buffer downstream when the input is fully consumed...
+ //Reasoning: For the sake of dealling with history, we can process the mini history input buffer,
+ //and then call work again on the real input buffer, but still yield one output buffer per input buffer.
+ if (input_fully_consumed)
+ {
+ tsbe::Buffer &buff = this->output_queues.front(i);
+ buff.get_length() = this->output_bytes_offset[i];
+ task_iface.post_downstream(i, buff);
+ this->output_queues.pop(i);
+ this->output_bytes_offset[i] = 0;
+ }
}
//------------------------------------------------------------------
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index b78165c..6ecbbbd 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -84,6 +84,7 @@ struct ElementImpl
//buffer queues and ready conditions
InputBufferQueues input_queues;
VectorOfQueues<tsbe::Buffer> output_queues;
+ std::vector<size_t> output_bytes_offset;
//tag tracking
std::vector<bool> input_tags_changed;
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 4faa690..3075348 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -120,8 +120,10 @@ struct InputBufferQueues
* Otherwise, check if the input buffer was entirely consumed.
* If so, pop the input buffer, copy the tail end of the buffer
* into the mini history buffer, and reset the offset condition.
+ *
+ * \return true if the buffer is fully consumed
*/
- inline void pop(const size_t i, const size_t bytes_consumed)
+ inline bool pop(const size_t i, const size_t bytes_consumed)
{
_offset_bytes[i] += bytes_consumed + _history_bytes[i];
@@ -138,7 +140,9 @@ struct InputBufferQueues
_queues[i].pop();
_bitset.set(i, not _queues[i].empty());
_offset_bytes[i] = 0;
+ return true;
}
+ return false;
}
inline void resize(const size_t size)