diff options
author | Josh Blum | 2012-09-29 10:30:52 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-29 10:30:52 -0700 |
commit | f3194e33de2a6496ce9ea838681e62b64fa689f4 (patch) | |
tree | 37b80081a4978826b6bf56603ed0bd35520e4af4 /lib/block_task.cpp | |
parent | 17e39ddbb0940d9d5e687713531e9a18d18e29f1 (diff) | |
download | sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.gz sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.bz2 sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.zip |
ported block task and tag handlers to apology
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 49 |
1 files changed, 25 insertions, 24 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index dda7dcc..176b161 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -14,25 +14,26 @@ // You should have received a copy of the GNU Lesser General Public License // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. -#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> #include "tag_handlers.hpp" -#include <gras_impl/messages.hpp> #define REALLY_BIG size_t(1 << 30) using namespace gnuradio; -void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) +void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first //flush partial output buffers to the downstream - for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + for (size_t i = 0; i < this->get_num_outputs(); i++) { if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); if (buff.length == 0) continue; - task_iface.post_downstream(i, buff); + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); this->output_queues.pop(i); } @@ -54,23 +55,23 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) //tell the upstream and downstram to re-check their tokens //this is how the other blocks know who is interested, //and can decide based on interest to set done or not - for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + for (size_t i = 0; i < this->get_num_inputs(); i++) { - task_iface.post_upstream(i, CheckTokensMessage()); + this->post_upstream(i, OutputCheckMessage()); } - for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + for (size_t i = 0; i < this->get_num_outputs(); i++) { - task_iface.post_downstream(i, CheckTokensMessage()); + this->post_downstream(i, InputCheckMessage()); } if (ARMAGEDDON) std::cerr << "==================================================\n" - << "== The " << name << " " << unique_id << " is done...\n" + << "== The " << block_ptr->to_string() << " is done...\n" << "==================================================\n" << std::flush; } -void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) +void BlockActor::handle_task(void) { #ifdef WORK_DEBUG WorkDebugPrinter WDP(this->name); @@ -87,8 +88,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_queues.all_ready() )) return; - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //const bool is_source = (num_inputs == 0); //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset @@ -162,7 +163,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //if we have outputs and at least one port has no downstream subscibers, mark done if (outputs_done) { - this->mark_done(task_iface); + this->mark_done(); return; } @@ -186,7 +187,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) if (num_output_items) goto forecast_again_you_jerk; this->forecast_fail = true; - this->conclusion(task_iface, inputs_done); + this->conclusion(inputs_done); return; } } @@ -198,7 +199,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) work_noutput_items = num_output_items; if (this->enable_fixed_rate) work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); - this->work_task_iface = task_iface; this->work_ret = -1; if (this->interruptible_thread) { @@ -208,12 +208,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { this->task_work(); } - this->work_task_iface.reset(); const size_t noutput_items = size_t(work_ret); if (work_ret == Block::WORK_DONE) { - this->mark_done(task_iface); + this->mark_done(); return; } @@ -230,7 +229,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t bytes = items*this->input_items_sizes[i]; this->input_queues.consume(i, bytes); - this->trim_tags(task_iface, i); + this->trim_tags(i); } //------------------------------------------------------------------ @@ -249,7 +248,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //dont always pass output buffers downstream for the sake of efficiency if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) { - task_iface.post_downstream(i, buff); + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); this->output_queues.pop(i); } } @@ -257,23 +258,23 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- Message self based on post-work conditions //------------------------------------------------------------------ - this->conclusion(task_iface, inputs_done); + this->conclusion(inputs_done); } -GRAS_FORCE_INLINE void ElementImpl::conclusion(const tsbe::TaskInterface &task_iface, const bool inputs_done) +GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done) { //if there are inputs, and not all are provided for, //tell the block to check input queues and handle done if (inputs_done) { - this->block.post_msg(CheckTokensMessage()); + this->Push(CheckTokensMessage(), Theron::Address()); return; } //still have IO ready? kick off another task if (this->input_queues.all_ready() and this->output_queues.all_ready()) { - this->block.post_msg(SelfKickMessage()); + this->Push(SelfKickMessage(), Theron::Address()); return; } } |