diff options
author | Josh Blum | 2012-10-13 14:58:18 -0700 |
---|---|---|
committer | Josh Blum | 2012-10-13 14:58:18 -0700 |
commit | b0e1ca63f8db775557d155b6b7d703f3bf251386 (patch) | |
tree | 618376dd17123da88741f7fbb733651f2cf80b09 | |
parent | 8354c9c4c92687904cedae0b0feb702c2d7e8408 (diff) | |
download | sandhi-b0e1ca63f8db775557d155b6b7d703f3bf251386.tar.gz sandhi-b0e1ca63f8db775557d155b6b7d703f3bf251386.tar.bz2 sandhi-b0e1ca63f8db775557d155b6b7d703f3bf251386.zip |
handle_update_inputs also performs resize, cleanup input fixed rate logic
-rw-r--r-- | TODO.txt | 5 | ||||
-rw-r--r-- | lib/block_task.cpp | 12 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 2 |
3 files changed, 18 insertions, 1 deletions
@@ -37,3 +37,8 @@ and not directly from block::<method> * want per-port token_pool for inputs and outputs ** if ports get removed, this will release subscribers + +* forecast + input buffer accumulate + +* token check for top block stop +** fixes qa UDP so we dont wait for inert message diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 1570064..fac70c0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -114,10 +114,17 @@ void BlockActor::handle_task(void) this->input_items[i].size() = items; this->work_input_items[i] = mem; this->work_ninput_items[i] = items; + if (this->enable_fixed_rate) { - items = std::max<int>(0, int(items) - int(this->input_configs[i].lookahead_items)); + if (items <= this->input_configs[i].lookahead_items) + { + if (this->inputs_done[i]) this->mark_done(); + return; + } + items -= this->input_configs[i].lookahead_items; } + num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; this->consume_called[i] = false; @@ -181,6 +188,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ + VAR(work_noutput_items); if (this->forecast_enable) { forecast_again_you_jerk: @@ -207,6 +215,8 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ + VAR(work_noutput_items); + if (num_inputs) VAR(work_ninput_items[0]); this->work_ret = -1; if (this->interruptible_thread) { diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index ae9f040..eba9bca 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -106,7 +106,9 @@ void BlockActor::handle_update_inputs( const UpdateInputsMessage &, const Theron::Address ){ + MESSAGE_TRACER(); const size_t num_inputs = this->get_num_inputs(); + this->input_queues.resize(num_inputs); //impose input reserve requirements based on relative rate and output multiple resize_fill_grow(this->input_reserve_items, num_inputs, 1); |