summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-02 14:23:28 -0700
committerJosh Blum2012-09-02 14:23:28 -0700
commit3522678e9ed21ba1e880b19762669fe2b0c44064 (patch)
tree796d177a494a8c4a0127eba88fdf802bf902722b /lib
parent0a988ab506d3489e13222e9c7ddff889c6371ee6 (diff)
downloadsandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.tar.gz
sandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.tar.bz2
sandhi-3522678e9ed21ba1e880b19762669fe2b0c44064.zip
checking in TODO and minor changes
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp12
-rw-r--r--lib/block_task.cpp23
-rw-r--r--lib/element_impl.hpp2
3 files changed, 28 insertions, 9 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index fcaa359..396416d 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -125,18 +125,28 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->input_tags.resize(num_inputs);
this->output_tags.resize(num_outputs);
+ //determine max history length for allocation below
+ this->max_history_items = *std::max_element(
+ this->input_history_items.begin(),
+ this->input_history_items.end()
+ );
+
//resize and clear that initial history
this->history_buffs.resize(num_inputs);
+ this->input_history_bytes.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
tsbe::Buffer &buff = this->history_buffs[i];
- const size_t num_bytes = this->input_items_sizes[i]*this->input_history_items[i];
+ const size_t num_items = this->input_history_items[i] + this->max_history_items;
+ const size_t num_bytes = this->input_items_sizes[i]*num_items;
+ this->input_history_bytes[i] = num_bytes;
if (not buff or buff.get_length() != num_bytes)
{
tsbe::BufferConfig config;
config.memory = NULL;
config.length = num_bytes;
buff = tsbe::Buffer(config);
+ std::memset(buff.get_memory(), 0, buff.get_length());
}
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index e3efb91..a84ef6a 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -93,18 +93,16 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_inputs; i++)
{
input_tokens_count += this->input_tokens[i].use_count();
- //this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
- ASSERT(this->input_queues.ready(i));
+ ASSERT(this->input_queues.ready(i));
const tsbe::Buffer &buff = this->input_queues.front(i);
+ ASSERT(this->input_buff_offsets[i] < buff.get_length());
char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
const size_t items = bytes/this->input_items_sizes[i];
- ASSERT(this->input_buff_offsets[i] < buff.get_length());
-
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i]._mem = mem;
this->input_items[i]._len = items;
@@ -120,10 +118,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_outputs; i++)
{
output_tokens_count += this->output_tokens[i].use_count();
- //this->produce_items[i] = 0;
- ASSERT(this->output_queues.ready(i));
+ ASSERT(this->output_multiple_items[i] == 1);
+ ASSERT(this->output_queues.ready(i));
const tsbe::Buffer &buff = this->output_queues.front(i);
char *mem = ((char *)buff.get_memory());
const size_t bytes = buff.get_length();
@@ -137,13 +135,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
}
//if we have outputs and at least one port has no downstream subscibers, mark done
- if ((num_outputs != 0 and output_tokens_count == num_outputs)){
+ if ((num_outputs != 0 and output_tokens_count == num_outputs))
+ {
this->mark_done(task_iface);
return;
}
//------------------------------------------------------------------
- //-- forecast (TODO) and work
+ //-- forecast
+ //------------------------------------------------------------------
+ if (not this->enable_fixed_rate)
+ {
+ block_ptr->forecast(num_output_items, work_ninput_items);
+ }
+
+ //------------------------------------------------------------------
+ //-- the work
//------------------------------------------------------------------
const int ret = block_ptr->Work(this->input_items, this->output_items);
const size_t noutput_items = size_t(ret);
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 333bfae..6d49033 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -74,7 +74,9 @@ struct ElementImpl
std::vector<size_t> input_buff_offsets;
//special buffer for dealing with history
+ std::vector<size_t> input_history_bytes;
std::vector<tsbe::Buffer> history_buffs;
+ size_t max_history_items;
//track the subscriber counts
std::vector<Token> input_tokens;