diff options
author | Josh Blum | 2012-09-02 20:01:21 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-02 20:01:21 -0700 |
commit | 31a786ff101ef66cf9a344df0efc3cb01db59da5 (patch) | |
tree | d0af1f8646d5448898a25a4cbac110fb78f7bbfe | |
parent | fb5c7144a4e9145612fd610fe5d9d2ede7697cc7 (diff) | |
download | sandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.tar.gz sandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.tar.bz2 sandhi-31a786ff101ef66cf9a344df0efc3cb01db59da5.zip |
history: output offset and holds for input consumption
-rw-r--r-- | lib/block_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 6 |
4 files changed, 24 insertions, 8 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index be53fe6..1a48ffb 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -115,6 +115,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->produce_items.resize(num_outputs, 0); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); + this->output_bytes_offset.resize(num_outputs, 0); this->input_tokens.resize(num_inputs); this->output_tokens.resize(num_outputs); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 59be851..e436bc1 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -120,8 +120,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) ASSERT(this->output_queues.ready(i)); const tsbe::Buffer &buff = this->output_queues.front(i); - char *mem = ((char *)buff.get_memory()); - const size_t bytes = buff.get_length(); + char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i]; + const size_t bytes = buff.get_length() - this->output_bytes_offset[i]; const size_t items = bytes/this->output_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); @@ -162,6 +162,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- process input consumption //------------------------------------------------------------------ + bool input_fully_consumed = true; for (size_t i = 0; i < num_inputs; i++) { ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); @@ -170,7 +171,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - this->input_queues.pop(i, bytes); + input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes); } //------------------------------------------------------------------ @@ -183,10 +184,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - tsbe::Buffer &buff = this->output_queues.front(i); - buff.get_length() = bytes; - task_iface.post_downstream(i, buff); - this->output_queues.pop(i); + this->output_bytes_offset[i] += bytes; + + //only pass output buffer downstream when the input is fully consumed... + //Reasoning: For the sake of dealling with history, we can process the mini history input buffer, + //and then call work again on the real input buffer, but still yield one output buffer per input buffer. + if (input_fully_consumed) + { + tsbe::Buffer &buff = this->output_queues.front(i); + buff.get_length() = this->output_bytes_offset[i]; + task_iface.post_downstream(i, buff); + this->output_queues.pop(i); + this->output_bytes_offset[i] = 0; + } } //------------------------------------------------------------------ diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index b78165c..6ecbbbd 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -84,6 +84,7 @@ struct ElementImpl //buffer queues and ready conditions InputBufferQueues input_queues; VectorOfQueues<tsbe::Buffer> output_queues; + std::vector<size_t> output_bytes_offset; //tag tracking std::vector<bool> input_tags_changed; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 4faa690..3075348 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -120,8 +120,10 @@ struct InputBufferQueues * Otherwise, check if the input buffer was entirely consumed. * If so, pop the input buffer, copy the tail end of the buffer * into the mini history buffer, and reset the offset condition. + * + * \return true if the buffer is fully consumed */ - inline void pop(const size_t i, const size_t bytes_consumed) + inline bool pop(const size_t i, const size_t bytes_consumed) { _offset_bytes[i] += bytes_consumed + _history_bytes[i]; @@ -138,7 +140,9 @@ struct InputBufferQueues _queues[i].pop(); _bitset.set(i, not _queues[i].empty()); _offset_bytes[i] = 0; + return true; } + return false; } inline void resize(const size_t size) |