summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-03 22:22:04 -0700
committerJosh Blum2012-09-03 22:22:04 -0700
commit536e1452f2a6df78b8cfb62649105a6e9882bc13 (patch)
treed431a9fe99c6707ca612574a9be2e05556c9bf75 /lib
parent20fc18a2a9f4c47f8e782f15d82aaf5f49d7f0aa (diff)
downloadsandhi-536e1452f2a6df78b8cfb62649105a6e9882bc13.tar.gz
sandhi-536e1452f2a6df78b8cfb62649105a6e9882bc13.tar.bz2
sandhi-536e1452f2a6df78b8cfb62649105a6e9882bc13.zip
work on input queues and history logic
Diffstat (limited to 'lib')
-rw-r--r--lib/block_allocator.cpp2
-rw-r--r--lib/block_task.cpp10
-rw-r--r--lib/element.cpp2
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp18
4 files changed, 17 insertions, 15 deletions
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 516729e..f98a0bd 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -38,7 +38,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_outputs; i++)
{
size_t items = this->hint;
- if (items == 0) items = 1024;
+ if (items == 0) items = 2048;
items = std::max(items, this->output_multiple_items[i]);
const size_t bytes = items * this->output_items_sizes[i];
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 0dcfca4..a649590 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -82,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->input_queues.all_ready() and
this->output_queues.all_ready()
)) return;
- //std::cout << "calling work on " << name << std::endl;
+ //std::cout << "=== calling work on " << name << " ===" << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -130,8 +130,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
output_tokens_count += this->output_tokens[i].use_count();
- ASSERT(this->output_multiple_items[i] == 1);
-
ASSERT(this->output_queues.ready(i));
const tsbe::Buffer &buff = this->output_queues.front(i);
char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i];
@@ -176,7 +174,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- process input consumption
//------------------------------------------------------------------
- bool input_fully_consumed = true;
+ bool input_allows_flush = true;
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
@@ -185,7 +183,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- input_fully_consumed = input_fully_consumed and this->input_queues.pop(i, bytes);
+ input_allows_flush = input_allows_flush and this->input_queues.pop(i, bytes);
}
//------------------------------------------------------------------
@@ -203,7 +201,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//only pass output buffer downstream when the input is fully consumed...
//Reasoning: For the sake of dealling with history, we can process the mini history input buffer,
//and then call work again on the real input buffer, but still yield one output buffer per input buffer.
- if (input_fully_consumed)
+ if (input_allows_flush)
{
tsbe::Buffer &buff = this->output_queues.front(i);
buff.get_length() = this->output_bytes_offset[i];
diff --git a/lib/element.cpp b/lib/element.cpp
index 34cfcf4..db0bbac 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -30,7 +30,7 @@ Element::Element(void)
Element::Element(const std::string &name)
{
this->reset(new ElementImpl());
- //VAR(name);
+ VAR(name);
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 3075348..563e5d0 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -100,14 +100,13 @@ struct InputBufferQueues
}
info.mem = (char *)hist_buff.get_memory() + _offset_bytes[i];
- info.len = hist_buff.get_length() - _offset_bytes[i];
+ info.len = hist_buff.get_length() - _offset_bytes[i] - _history_bytes[i];
}
else
{
- const size_t delta_bytes = _offset_bytes[i] - _history_bytes[i];
- info.mem = ((char *)buff.get_memory()) + delta_bytes;
- info.len = buff.get_length() - delta_bytes;
+ info.mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i];
+ info.len = buff.get_length() - _offset_bytes[i];
}
return info;
@@ -121,17 +120,22 @@ struct InputBufferQueues
* If so, pop the input buffer, copy the tail end of the buffer
* into the mini history buffer, and reset the offset condition.
*
- * \return true if the buffer is fully consumed
+ * \return true if the input allows output flushing
*/
inline bool pop(const size_t i, const size_t bytes_consumed)
{
- _offset_bytes[i] += bytes_consumed + _history_bytes[i];
+ _offset_bytes[i] += bytes_consumed;
//if totally consumed, memcpy history and pop
const tsbe::Buffer &buff = _queues[i].front();
if (_offset_bytes[i] >= buff.get_length())
{
ASSERT(_offset_bytes[i] == buff.get_length()); //bad to consume more than buffer allows
+ /*if (_offset_bytes[i] != buff.get_length())
+ {
+ VAR(_offset_bytes[i]);
+ VAR(buff.get_length());
+ }*/
if (_history_bytes[i] != 0)
{
char *src_mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i];
@@ -142,7 +146,7 @@ struct InputBufferQueues
_offset_bytes[i] = 0;
return true;
}
- return false;
+ return _history_bytes[i] == 0;
}
inline void resize(const size_t size)