summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-10-13 13:22:18 -0700
committerJosh Blum2012-10-13 13:22:18 -0700
commit8354c9c4c92687904cedae0b0feb702c2d7e8408 (patch)
tree4c063baf0dfa3a06a6f090b2346b593babaa2efb /lib
parentf9c854d3cdd0a9337b41842d15a13d6df4ece408 (diff)
downloadsandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.gz
sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.tar.bz2
sandhi-8354c9c4c92687904cedae0b0feb702c2d7e8408.zip
lots of mini tweaks for QA passing, WIP...
Diffstat (limited to 'lib')
-rw-r--r--lib/block_allocator.cpp6
-rw-r--r--lib/block_handlers.cpp16
-rw-r--r--lib/block_task.cpp14
-rw-r--r--lib/gr_block.cpp3
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp4
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp4
-rw-r--r--lib/input_handlers.cpp1
-rw-r--r--lib/top_block.cpp2
-rw-r--r--lib/topology_handler.cpp3
10 files changed, 23 insertions, 32 deletions
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 9b70824..05b35ac 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -78,12 +78,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
{
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
this->output_buffer_tokens.resize(num_outputs);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 2c14643..c9b289b 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -27,12 +27,6 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->start();
@@ -50,10 +44,6 @@ void BlockActor::handle_top_inert(
){
MESSAGE_TRACER();
- if (this->block_state != BLOCK_STATE_DONE)
- {
- this->block_ptr->stop();
- }
this->mark_done();
this->Send(0, from); //ACK
@@ -65,12 +55,6 @@ void BlockActor::handle_top_token(
){
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 66669d1..1570064 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -25,6 +25,8 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->block_ptr->stop();
+
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
@@ -103,15 +105,19 @@ void BlockActor::handle_task(void)
ASSERT(this->input_queues.ready(i));
bool potential_inline;
- const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
+ const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate and num_outputs, potential_inline);
void *mem = buff.get();
- const size_t items = buff.length/this->input_items_sizes[i];
+ size_t items = buff.length/this->input_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
+ if (this->enable_fixed_rate)
+ {
+ items = std::max<int>(0, int(items) - int(this->input_configs[i].lookahead_items));
+ }
num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
this->consume_called[i] = false;
@@ -184,11 +190,12 @@ void BlockActor::handle_task(void)
{
if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
+ const size_t work_noutput_items_last = work_noutput_items;
work_noutput_items = work_noutput_items/2; //backoff regime
work_noutput_items += this->output_multiple_items-1;
work_noutput_items /= this->output_multiple_items;
work_noutput_items *= this->output_multiple_items;
- if (work_noutput_items) goto forecast_again_you_jerk;
+ if (work_noutput_items and work_noutput_items_last != work_noutput_items) goto forecast_again_you_jerk;
//handle the case of forecast failing
//TODO accumulate input here, only done if inputs done and already accumulated
@@ -210,6 +217,7 @@ void BlockActor::handle_task(void)
this->task_work();
}
const size_t noutput_items = size_t(work_ret);
+ VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{
diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp
index 834bbd5..a384af1 100644
--- a/lib/gr_block.cpp
+++ b/lib/gr_block.cpp
@@ -80,7 +80,7 @@ bool gr_block::is_unaligned(void)
size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items)
{
- return size_t((noutput_items/this->relative_rate()));
+ return size_t(0.5 + (noutput_items/this->relative_rate())) + this->history() - 1;
}
size_t gr_block::interpolation(void) const
@@ -91,6 +91,7 @@ size_t gr_block::interpolation(void) const
void gr_block::set_interpolation(const size_t interp)
{
this->set_relative_rate(1.0*interp);
+ this->set_output_multiple(interp);
}
size_t gr_block::decimation(void) const
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 5dd7421..b77861b 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -114,7 +114,7 @@ struct BlockActor : Apology::Worker
void trim_tags(const size_t index);
GRAS_FORCE_INLINE bool any_inputs_done(void)
{
- if (this->inputs_done.none()) return false;
+ if (this->inputs_done.none() or this->input_queues.all_ready()) return false;
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
if (this->inputs_done[i] and not this->input_queues.ready(i))
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index a3ca1dc..3666330 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -152,6 +152,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::init(
_history_bytes[i] + _reserve_bytes[i],
_reserve_bytes[i]
);
+ _reserve_bytes[i] = 1;
//post bytes are the desired buffer size to escape the edge case
_post_bytes[i] = std::max(
@@ -202,7 +203,8 @@ GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool co
//same buffer, different offset and length
SBuffer buff = front;
- if (conserve_history) buff.length -= _history_bytes[i];
+ //if (conserve_history) ASSERT(buff.length >= _history_bytes[i]);
+ //if (conserve_history) buff.length -= _history_bytes[i];
//set the flag that this buffer *might* be inlined as an output buffer
potential_inline = unique and (buff.length == front.length);
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index a740b2d..1f07af0 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -76,9 +76,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE void flush_all(void)
{
- _queues.clear();
- _queues.resize(_bitset.size());
- _bitset.reset();
+ for (size_t i = 0; i < this->size(); i++) this->flush(i);
}
GRAS_FORCE_INLINE bool ready(const size_t i) const
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 30bb613..c2abe73 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -33,6 +33,7 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
{
MESSAGE_TRACER();
const size_t index = message.index;
+ VAR(this->input_tokens[index].use_count());
//handle incoming stream buffer, push into the queue
if (this->block_state == BLOCK_STATE_DONE) return;
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index bcce1bf..d71a4a7 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -47,6 +47,8 @@ TopBlock::TopBlock(const std::string &name):
void ElementImpl::top_block_cleanup(void)
{
this->executor->post_all(TopInertMessage());
+ this->topology->clear_all();;
+ this->executor->commit();
if (ARMAGEDDON) std::cerr
<< "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
<< "xx Top Block Destroyed: " << name << "\n"
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index cc57ed6..ae9f040 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -114,7 +114,8 @@ void BlockActor::handle_update_inputs(
for (size_t i = 0; i < num_inputs; i++)
{
input_lookahead_items[i] = this->input_configs[i].lookahead_items;
- this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
+ if (this->enable_fixed_rate)
+ this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1;
}